You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2017/07/17 22:11:24 UTC
[1/2] usergrid git commit: move deletes to new delete queue -- read
repair will fix attempts to access deleted entities and connections,
so indexing and collection deletes can proceed more slowly than other types of
changes
Repository: usergrid
Updated Branches:
refs/heads/collectionDelete 99ba349c8 -> b6d14069a
move deletes to new delete queue -- read repair will fix attempts to access deleted entities and connections, so indexing and collection deletes can proceed more slowly than other types of changes
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/221b99dd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/221b99dd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/221b99dd
Branch: refs/heads/collectionDelete
Commit: 221b99dd9d81968a5629647771a6eaf83c4177be
Parents: 99ba349
Author: Mike Dunker <md...@google.com>
Authored: Mon Jul 10 08:03:52 2017 -0700
Committer: Mike Dunker <md...@google.com>
Committed: Mon Jul 10 08:03:52 2017 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventQueueType.java | 35 +++
.../asyncevents/AsyncEventService.java | 4 +-
.../asyncevents/AsyncEventServiceImpl.java | 315 ++++++++-----------
.../index/IndexProcessorFig.java | 18 ++
.../corepersistence/index/ReIndexAction.java | 5 +-
.../index/ReIndexServiceImpl.java | 3 +-
.../read/traverse/AbstractReadGraphFilter.java | 11 +-
.../AbstractReadReverseGraphFilter.java | 11 +-
.../index/CollectionVersionTest.java | 9 +
9 files changed, 220 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
new file mode 100644
index 0000000..4b91e17
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public enum AsyncEventQueueType {
+ REGULAR ("regular"), UTILITY("utility"), DELETE("delete");
+
+ private String displayName;
+ AsyncEventQueueType(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public String toString() {
+ return displayName;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 5fe4295..7ce208f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -85,9 +85,9 @@ public interface AsyncEventService extends ReIndexAction {
/**
*
* @param indexOperationMessage
- * @param forUtilityQueue
+ * @param queueType
*/
- void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
+ void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType);
/**
* @param applicationScope
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 257e172..e33865e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -103,13 +103,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+ public static final String QUEUE_NAME_DELETE = "delete";
public static final String DEAD_LETTER_SUFFIX = "_dead";
private final LegacyQueueManager indexQueue;
private final LegacyQueueManager utilityQueue;
+ private final LegacyQueueManager deleteQueue;
private final LegacyQueueManager indexQueueDead;
private final LegacyQueueManager utilityQueueDead;
+ private final LegacyQueueManager deleteQueueDead;
private final IndexProcessorFig indexProcessorFig;
private final LegacyQueueFig queueFig;
private final CollectionVersionFig collectionVersionFig;
@@ -133,8 +136,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
private final Counter indexErrorCounter;
private final AtomicLong counter = new AtomicLong();
private final AtomicLong counterUtility = new AtomicLong();
+ private final AtomicLong counterDelete = new AtomicLong();
private final AtomicLong counterIndexDead = new AtomicLong();
private final AtomicLong counterUtilityDead = new AtomicLong();
+ private final AtomicLong counterDeleteDead = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle;
private final MapManager esMapPersistence;
@@ -177,16 +182,24 @@ public class AsyncEventServiceImpl implements AsyncEventService {
LegacyQueueScope utilityQueueScope =
new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
+ LegacyQueueScope deleteQueueScope =
+ new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL);
+
LegacyQueueScope indexQueueDeadScope =
new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
LegacyQueueScope utilityQueueDeadScope =
new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
+ LegacyQueueScope deleteQueueDeadScope =
+ new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL, true);
+
this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+ this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope);
this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
+ this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope);
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
@@ -211,24 +224,73 @@ public class AsyncEventServiceImpl implements AsyncEventService {
start();
}
+ private String getQueueName(AsyncEventQueueType queueType) {
+ switch (queueType) {
+ case REGULAR:
+ return QUEUE_NAME;
+
+ case UTILITY:
+ return QUEUE_NAME_UTILITY;
+
+ case DELETE:
+ return QUEUE_NAME_DELETE;
+
+ default:
+ throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+ }
+ }
+
+ private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
+ return getQueue(queueType, false);
+ }
+
+ private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue) {
+ switch (queueType) {
+ case REGULAR:
+ return isDeadQueue ? indexQueueDead : indexQueue;
+
+ case UTILITY:
+ return isDeadQueue ? utilityQueueDead : utilityQueue;
+
+ case DELETE:
+ return isDeadQueue ? deleteQueueDead : deleteQueue;
+
+ default:
+ throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+ }
+ }
+
+ private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) {
+ switch (queueType) {
+ case REGULAR:
+ return isDeadQueue ? counterIndexDead : counter;
+
+ case UTILITY:
+ return isDeadQueue ? counterUtilityDead : counterUtility;
+
+ case DELETE:
+ return isDeadQueue ? counterDeleteDead : counterDelete;
+
+ default:
+ throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+ }
+ }
+
+
/**
* Offer the EntityIdScope to SQS
*/
private void offer(final Serializable operation) {
- offer(operation, false);
+ offer(operation, AsyncEventQueueType.REGULAR);
}
- private void offer(final Serializable operation, boolean forUtilityQueue) {
+ private void offer(final Serializable operation, AsyncEventQueueType queueType) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- if (forUtilityQueue) {
- this.indexQueue.sendMessageToLocalRegion(operation);
- } else {
- this.indexQueue.sendMessageToLocalRegion(operation);
- }
+ getQueue(queueType).sendMessageToLocalRegion(operation);
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -238,16 +300,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
+ private void offerTopic(final Serializable operation, AsyncEventQueueType queueType) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- if (forUtilityQueue) {
- this.utilityQueue.sendMessageToAllRegions(operation);
- } else {
- this.indexQueue.sendMessageToAllRegions(operation);
- }
+ getQueue(queueType).sendMessageToAllRegions(operation);
}
catch ( IOException e ) {
throw new RuntimeException( "Unable to queue message", e );
@@ -258,15 +316,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private void offerBatch(final List operations, boolean forUtilityQueue){
+ private void offerBatch(final List operations, AsyncEventQueueType queueType){
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- if( forUtilityQueue ){
- this.utilityQueue.sendMessages(operations);
- }else{
- this.indexQueue.sendMessages(operations);
- }
+ getQueue(queueType).sendMessages(operations);
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -274,78 +328,24 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
}
- private void offerBatchToUtilityQueue(final List operations){
- try {
- //signal to SQS
- this.utilityQueue.sendMessages(operations);
- } catch (IOException e) {
- throw new RuntimeException("Unable to queue message", e);
- }
- }
-
/**
* Take message
*/
- private List<LegacyQueueMessage> take() {
-
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
- }
- finally {
- //stop our timer
- timer.stop();
- }
- }
-
- /**
- * Take message from SQS utility queue
- */
- private List<LegacyQueueMessage> takeFromUtilityQueue() {
-
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
- }
- finally {
- //stop our timer
- timer.stop();
- }
- }
-
- /**
- * Take message from index dead letter queue
- */
- private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
+ private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue) {
final Timer.Context timer = this.readTimer.time();
try {
- return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+ return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class);
}
finally {
//stop our timer
timer.stop();
}
}
-
- /**
- * Take message from SQS utility dead letter queue
- */
- private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
-
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
- }
- finally {
- //stop our timer
- timer.stop();
- }
+ private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
+ return take(queueType, false);
}
@@ -376,42 +376,20 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
}
-
- /**
- * calls the event handlers and returns a result with information on whether
- * it needs to be ack'd and whether it needs to be indexed
- * Ack message in SQS
- */
- public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
- try{
- utilityQueue.commitMessages( messages );
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
+ public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType, boolean isDeadQueue) {
+ if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
+ // different functionality
+ ack(messages);
}
- }
-
- /**
- * ack messages in index dead letter queue
- */
- public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
- try{
- indexQueueDead.commitMessages( messages );
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
+ try {
+ getQueue(queueType, isDeadQueue).commitMessages( messages );
}
- }
-
- /**
- * ack messages in utility dead letter queue
- */
- public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
- try{
- utilityQueueDead.commitMessages( messages );
- }catch(Exception e){
+ catch (Exception e) {
throw new RuntimeException("Unable to ack messages", e);
}
}
+
/**
* calls the event handlers and returns a result with information on whether
* it needs to be ack'd and whether it needs to be indexed
@@ -555,7 +533,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
- new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
+ new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
}
@@ -645,7 +623,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
- offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+ offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
}
private IndexOperationMessage handleEdgeDelete(final LegacyQueueMessage message) {
@@ -678,9 +656,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
/**
* Queue up an indexOperationMessage for multi region execution
* @param indexOperationMessage
- * @param forUtilityQueue
+ * @param queueType
*/
- public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
+ public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType) {
// don't try to produce something with nothing
if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -706,7 +684,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
- offerTopic( elasticsearchIndexEvent, forUtilityQueue );
+ offerTopic( elasticsearchIndexEvent, queueType );
}
private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
@@ -782,7 +760,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
- new EntityIdScope( applicationScope, entityId), markedVersion), false);
+ new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE);
}
@@ -844,7 +822,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
- offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+ offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ,
+ AsyncEventQueueType.DELETE);
}
private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
@@ -883,7 +862,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
- offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true);
+ offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
+ AsyncEventQueueType.DELETE);
}
private void handleCollectionDelete(final LegacyQueueMessage message) {
@@ -932,7 +912,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
offer(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
new EntityIdScope(applicationScope, edgeScope.getEdge().getTargetNode()),false),
- true);
+ AsyncEventQueueType.DELETE);
count.incrementAndGet();
}).toBlocking().lastOrDefault(null);
@@ -940,7 +920,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
if (count.intValue() >= maxDeletes) {
// requeue collection delete for next chunk of deletes
- offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true);
+ offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
+ AsyncEventQueueType.DELETE);
}
}
@@ -965,29 +946,39 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public void start() {
final int indexCount = indexProcessorFig.getWorkerCount();
final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+ final int deleteCount = indexProcessorFig.getWorkerCountDelete();
final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
+ final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter();
for (int i = 0; i < indexCount; i++) {
- startWorker(QUEUE_NAME);
+ startWorker(AsyncEventQueueType.REGULAR);
}
for (int i = 0; i < utilityCount; i++) {
- startWorker(QUEUE_NAME_UTILITY);
+ startWorker(AsyncEventQueueType.UTILITY);
+ }
+
+ for (int i = 0; i < deleteCount; i++) {
+ startWorker(AsyncEventQueueType.DELETE);
}
if( indexQueue instanceof SNSQueueManagerImpl ) {
- logger.info("Queue manager implementation supports dead letters, start dead letter queue worker.");
+ logger.info("Queue manager implementation supports dead letters, start dead letter queue workers.");
for (int i = 0; i < indexDeadCount; i++) {
- startDeadQueueWorker(QUEUE_NAME);
+ startDeadQueueWorker(AsyncEventQueueType.REGULAR);
+ }
+
+ for (int i = 0; i < utilityDeadCount; i++) {
+ startDeadQueueWorker(AsyncEventQueueType.UTILITY);
+ }
+
+ for (int i = 0; i < deleteDeadCount; i++) {
+ startDeadQueueWorker(AsyncEventQueueType.DELETE);
}
}else{
logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue worker.");
}
-
- for (int i = 0; i < utilityDeadCount; i++) {
- startDeadQueueWorker(QUEUE_NAME_UTILITY);
- }
}
@@ -1005,11 +996,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private void startWorker(final String type) {
- Preconditions.checkNotNull(type, "Worker type required");
+ private void startWorker(final AsyncEventQueueType queueType) {
synchronized (mutex) {
- boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+ String type = getQueueName(queueType);
Observable<List<LegacyQueueMessage>> consumer =
Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@@ -1017,20 +1007,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
- long threadNum = isUtilityQueue ?
- counterUtility.incrementAndGet() : counter.incrementAndGet();
- Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+ long threadNum = getCounter(queueType, false).incrementAndGet();
+ Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum );
List<LegacyQueueMessage> drainList = null;
do {
try {
- if ( isUtilityQueue ){
- drainList = takeFromUtilityQueue();
- }else{
- drainList = take();
+ drainList = take(queueType);
- }
//emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList);
@@ -1086,7 +1071,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// submit the processed messages to index producer
List<LegacyQueueMessage> messagesToAck =
- submitToIndex( indexEventResults, isUtilityQueue );
+ submitToIndex( indexEventResults, queueType );
if ( messagesToAck.size() < messages.size() ) {
logger.warn(
@@ -1096,12 +1081,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// ack each message if making it to this point
if( messagesToAck.size() > 0 ){
-
- if ( isUtilityQueue ){
- ackUtilityQueue( messagesToAck );
- }else{
- ack( messagesToAck );
- }
+ ack(messagesToAck, queueType, false);
}
return messagesToAck;
@@ -1125,31 +1105,25 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private void startDeadQueueWorker(final String type) {
- Preconditions.checkNotNull(type, "Worker type required");
+ private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
+ String type = getQueueName(queueType);
synchronized (mutex) {
- boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
-
Observable<List<LegacyQueueMessage>> consumer =
Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@Override
public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
- long threadNum = isUtilityDeadQueue ?
- counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
- Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
+ long threadNum = getCounter(queueType, true).incrementAndGet();
+ Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type + "_" + threadNum );
List<LegacyQueueMessage> drainList = null;
do {
try {
- if ( isUtilityDeadQueue ){
- drainList = takeFromUtilityDeadQueue();
- }else{
- drainList = takeFromIndexDeadQueue();
- }
+ drainList = take(queueType, true);
+
//emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList);
@@ -1198,18 +1172,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
try {
// put the dead letter messages back in the appropriate queue
- LegacyQueueManager returnQueue = null;
- String queueType;
- if (isUtilityDeadQueue) {
- returnQueue = utilityQueue;
- queueType = "utility";
- } else {
- returnQueue = indexQueue;
- queueType = "index";
- }
+ LegacyQueueManager returnQueue = getQueue(queueType, false);
+
List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
for (LegacyQueueMessage msg : successMessages) {
- logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+ logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(), msg.getStringBody());
}
int unsuccessfulMessagesSize = messages.size() - successMessages.size();
if (unsuccessfulMessagesSize > 0) {
@@ -1226,16 +1193,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
for (LegacyQueueMessage msg : messages) {
String messageId = msg.getMessageId();
if (!successMessageIds.contains(messageId)) {
- logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
+ logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(), messageId, msg.getStringBody());
}
}
}
- if (isUtilityDeadQueue) {
- ackUtilityDeadQueue(successMessages);
- } else {
- ackIndexDeadQueue(successMessages);
- }
+ ack(successMessages, queueType, true);
return messages;
}
@@ -1261,7 +1224,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
* Submit results to index and return the queue messages to be ack'd
*
*/
- private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
+ private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, AsyncEventQueueType queueType) {
// if nothing came back then return empty list
if(indexEventResults==null){
@@ -1288,7 +1251,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// collect into a list of QueueMessages that can be ack'd later
.collect(Collectors.toList());
- queueIndexOperationMessage(combined, forUtilityQueue);
+ queueIndexOperationMessage(combined, queueType);
return queueMessages;
}
@@ -1299,10 +1262,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
new EntityIndexOperation( applicationScope, id, updatedSince);
queueIndexOperationMessage(
- eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
+ eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR);
}
- public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
+ public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType) {
final List<EntityIndexEvent> batch = new ArrayList<>();
edges.forEach(e -> {
@@ -1315,7 +1278,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
- offerBatch( batch, forUtilityQueue );
+ offerBatch( batch, queueType );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7eecf04..eb63056 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,10 +40,14 @@ public interface IndexProcessorFig extends GuicyFig {
String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
+ String ELASTICSEARCH_WORKER_COUNT_DELETE = "elasticsearch.worker_count_delete";
+
String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
+ String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = "elasticsearch.worker_count_delete_deadletter";
+
String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -105,6 +109,13 @@ public interface IndexProcessorFig extends GuicyFig {
int getWorkerCountUtility();
/**
+ * The number of worker threads used to read delete requests from the queue.
+ */
+ @Default("1")
+ @Key(ELASTICSEARCH_WORKER_COUNT_DELETE)
+ int getWorkerCountDelete();
+
+ /**
* The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
*/
@Default("1")
@@ -119,6 +130,13 @@ public interface IndexProcessorFig extends GuicyFig {
int getWorkerCountUtilityDeadLetter();
/**
+ * The number of worker threads used to read dead messages from the delete dead letter queue and reload them into the delete queue.
+ */
+ @Default("1")
+ @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER)
+ int getWorkerCountDeleteDeadLetter();
+
+ /**
* Set the implementation to use for queuing.
* Valid values: TEST, LOCAL, SQS, SNS
* NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 2b3573e..d6bdd93 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ public interface ReIndexAction {
* Index a batch list of entities. Goes to the utility queue.
* @param edges
* @param updatedSince
- * @param forUtilityQueue
+ * @param queueType
*/
- void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
+ void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..c7371b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public class ReIndexServiceImpl implements ReIndexService {
.buffer( indexProcessorFig.getReindexBufferSize())
.doOnNext( edgeScopes -> {
logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
- indexService.indexBatch(edgeScopes, modifiedSince, true);
+ indexService.indexBatch(edgeScopes, modifiedSince, AsyncEventQueueType.UTILITY);
count.addAndGet(edgeScopes.size() );
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index e9aa6c8..b1b7f75 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -122,7 +123,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
- asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -132,7 +133,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
- asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -142,7 +143,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
- asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -227,13 +228,13 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
}
}
- private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+ private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
- asyncEventService.queueIndexOperationMessage(indexOperation, false);
+ asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1b662cc..c75545e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -122,7 +123,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
- asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -132,7 +133,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
- asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -142,7 +143,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
- asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -221,13 +222,13 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
}
}
- private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+ private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
- asyncEventService.queueIndexOperationMessage(indexOperation, false);
+ asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
index 0278708..a3c7284 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
@@ -19,5 +19,14 @@
package org.apache.usergrid.corepersistence.index;
+import org.junit.runner.RunWith;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+@NotThreadSafe
public class CollectionVersionTest {
}
[2/2] usergrid git commit: Add collection version testing and new
_version endpoint to retrieve the version for a collection.
Posted by md...@apache.org.
Add collection version testing and new _version endpoint to retrieve the version for a collection.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b6d14069
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b6d14069
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b6d14069
Branch: refs/heads/collectionDelete
Commit: b6d14069aa500530d5b86112efd732031e52b0e8
Parents: 221b99d
Author: Mike Dunker <md...@google.com>
Authored: Mon Jul 17 14:54:31 2017 -0700
Committer: Mike Dunker <md...@google.com>
Committed: Mon Jul 17 14:54:31 2017 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 2 +-
.../corepersistence/CpEntityManager.java | 17 +-
.../corepersistence/CpEntityManagerFactory.java | 6 +-
.../index/CollectionClearService.java | 35 ++++
.../index/CollectionClearServiceImpl.java | 69 +++++++
.../index/CollectionDeleteService.java | 30 ----
.../index/CollectionDeleteServiceImpl.java | 57 ------
.../index/CollectionVersionManagerImpl.java | 4 +-
.../usergrid/persistence/EntityManager.java | 4 +-
.../org/apache/usergrid/persistence/Query.java | 18 +-
.../index/CollectionVersionTest.java | 32 ----
.../persistence/index/query/Identifier.java | 3 +-
.../rest/applications/CollectionResource.java | 58 +++++-
.../collection/CollectionClearTest.java | 179 +++++++++++++++++++
14 files changed, 374 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 5515abd..6a93af5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -143,7 +143,7 @@ public class CoreModule extends AbstractModule {
bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
- bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class );
+ bind( CollectionClearService.class ).to( CollectionClearServiceImpl.class );
bind( ExportService.class ).to( ExportServiceImpl.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index a76720d..e192939 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -158,7 +158,7 @@ public class CpEntityManager implements EntityManager {
private EntityCollectionManager ecm;
public QueueManagerFactory queueManagerFactory;
- private CollectionDeleteService collectionDeleteService;
+ private CollectionClearService collectionClearService;
private CollectionVersionManagerFactory collectionVersionManagerFactory;
@@ -187,7 +187,7 @@ public class CpEntityManager implements EntityManager {
final CollectionSettingsFactory collectionSettingsFactory,
final UUID applicationId,
final QueueManagerFactory queueManagerFactory,
- final CollectionDeleteService collectionDeleteService,
+ final CollectionClearService collectionClearService,
final CollectionVersionManagerFactory collectionVersionManagerFactory) {
this.entityManagerFig = entityManagerFig;
@@ -255,7 +255,7 @@ public class CpEntityManager implements EntityManager {
this.skipAggregateCounters = false;
this.queueManagerFactory = queueManagerFactory;
- this.collectionDeleteService = collectionDeleteService;
+ this.collectionClearService = collectionClearService;
this.collectionVersionManagerFactory = collectionVersionManagerFactory;
}
@@ -1890,9 +1890,16 @@ public class CpEntityManager implements EntityManager {
}
@Override
- public void deleteCollection( String collectionName ){
+ public void clearCollection(String collectionName ){
- collectionDeleteService.deleteCollection(applicationId, collectionName);
+ collectionClearService.clearCollection(applicationId, collectionName);
+
+ }
+
+ @Override
+ public String getCollectionVersion(String collectionName ){
+
+ return collectionClearService.getCollectionVersion(applicationId, collectionName);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index b3dac57..2e3b180 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -117,7 +117,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private final CollectionSettingsFactory collectionSettingsFactory;
private ActorSystemManager actorSystemManager;
private final LockManager lockManager;
- private final CollectionDeleteService collectionDeleteService;
+ private final CollectionClearService collectionClearService;
private final CollectionVersionManagerFactory collectionVersionManagerFactory;
private final QueueManagerFactory queueManagerFactory;
@@ -143,7 +143,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
this.collectionService = injector.getInstance( CollectionService.class );
this.connectionService = injector.getInstance( ConnectionService.class );
this.collectionSettingsFactory = injector.getInstance( CollectionSettingsFactory.class );
- this.collectionDeleteService = injector.getInstance( CollectionDeleteService.class );
+ this.collectionClearService = injector.getInstance( CollectionClearService.class );
this.collectionVersionManagerFactory = injector.getInstance( CollectionVersionManagerFactory.class );
Properties properties = cassandraService.getProperties();
@@ -395,7 +395,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
collectionSettingsFactory,
applicationId,
queueManagerFactory,
- collectionDeleteService,
+ collectionClearService,
collectionVersionManagerFactory);
return em;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java
new file mode 100644
index 0000000..76380e5
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import java.util.UUID;
+
+public interface CollectionClearService {
+
+ /**
+ * Clear the entities from the current version of a collection by changing the collection version and queueing up a delete of the old entities
+ */
+ void clearCollection(final UUID applicationID, final String baseCollectionName);
+
+ /**
+ * Get the current version of a collection
+ */
+ String getCollectionVersion(final UUID applicationID, final String baseCollectionName);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
new file mode 100644
index 0000000..ff64d6a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+public class CollectionClearServiceImpl implements CollectionClearService {
+ private static final Logger logger = LoggerFactory.getLogger(CollectionClearServiceImpl.class );
+
+ private CollectionVersionManagerFactory collectionVersionManagerFactory;
+ private AsyncEventService asyncEventService;
+
+ @Inject
+ public CollectionClearServiceImpl(
+ final CollectionVersionManagerFactory collectionVersionManagerFactory,
+ final AsyncEventService asyncEventService
+ )
+ {
+ this.collectionVersionManagerFactory = collectionVersionManagerFactory;
+ this.asyncEventService = asyncEventService;
+ }
+
+ @Override
+ public void clearCollection(final UUID applicationID, final String baseCollectionName) {
+ CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName);
+ CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope);
+
+ // change version
+ String oldVersion = collectionVersionManager.updateCollectionVersion();
+ logger.info("Collection cleared: appID:{} baseCollectionName:{} oldVersion:{} newVersion:{}",
+ applicationID.toString(), baseCollectionName, oldVersion, collectionVersionManager.getCollectionVersion(false));
+
+ // queue up delete of old version entities
+ asyncEventService.queueCollectionDelete(scope, oldVersion);
+ }
+
+ @Override
+ public String getCollectionVersion(UUID applicationID, String baseCollectionName) {
+ CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName);
+ CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope);
+
+ String currentVersion = collectionVersionManager.getCollectionVersion(true);
+
+ return currentVersion;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
deleted file mode 100644
index 85b8fed..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-import java.util.UUID;
-
-public interface CollectionDeleteService {
-
- /**
- * Delete the current version of a collection by changing the collection version and queueing up a delete of the old entities
- */
- void deleteCollection(final UUID applicationID, final String baseCollectionName);
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
deleted file mode 100644
index 5c64079..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-
-public class CollectionDeleteServiceImpl implements CollectionDeleteService {
- private static final Logger logger = LoggerFactory.getLogger(CollectionDeleteServiceImpl.class );
-
- private CollectionVersionManagerFactory collectionVersionManagerFactory;
- private AsyncEventService asyncEventService;
-
- @Inject
- public CollectionDeleteServiceImpl(
- final CollectionVersionManagerFactory collectionVersionManagerFactory,
- final AsyncEventService asyncEventService
- )
- {
- this.collectionVersionManagerFactory = collectionVersionManagerFactory;
- this.asyncEventService = asyncEventService;
- }
-
- @Override
- public void deleteCollection(final UUID applicationID, final String baseCollectionName) {
- CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName);
- CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope);
-
- // change version
- String oldVersion = collectionVersionManager.updateCollectionVersion();
-
- // queue up delete of old version entities
- asyncEventService.queueCollectionDelete(scope, oldVersion);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
index 7ed557c..c5bb417 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
@@ -95,7 +95,7 @@ public class CollectionVersionManagerImpl implements CollectionVersionManager {
}
String oldCollectionVersion = getCollectionVersion(true);
- String newCollectionVersion = getNewCollectionVersion();
+ String newCollectionVersion = generateNewCollectionVersion();
mapManager.putLong(MAP_PREFIX_LAST_CHANGED+collectionName, System.currentTimeMillis());
mapManager.putString(MAP_PREFIX_VERSION+collectionName, newCollectionVersion);
cache.put(scope, newCollectionVersion);
@@ -104,7 +104,7 @@ public class CollectionVersionManagerImpl implements CollectionVersionManager {
return oldCollectionVersion;
}
- private static String getNewCollectionVersion() {
+ private static String generateNewCollectionVersion() {
return UUIDGenerator.newTimeUUID().toString();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index a977f31..f55263d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -530,7 +530,9 @@ public interface EntityManager {
Object getCollectionSettings( String collectionName );
- void deleteCollection( String collectionName );
+ void clearCollection(String collectionName);
+
+ String getCollectionVersion(String collectionName);
public void grantRolePermission( String roleName, String permission ) throws Exception;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 900bda5..5662f06 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.index.query.tree.Operand;
import org.apache.usergrid.persistence.index.utils.ClassUtils;
import org.apache.usergrid.persistence.index.utils.ListUtils;
import org.apache.usergrid.persistence.index.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -41,7 +43,7 @@ import java.util.Map.Entry;
public class Query {
-
+ private static final Logger logger = LoggerFactory.getLogger(Query.class);
public enum Level {
@@ -319,6 +321,13 @@ public class Query {
public static Query fromIdentifier( Object id ) {
+ if (id == null) {
+ throw new IllegalArgumentException("null identifier passed in");
+ }
+ Identifier objectIdentifier = Identifier.from(id);
+ if (objectIdentifier == null) {
+ throw new IllegalArgumentException("Supplied id results in null Identifier");
+ }
Query q = new Query();
q.addIdentifier( Identifier.from(id) );
return q;
@@ -409,6 +418,10 @@ public class Query {
}
for ( Identifier identifier : identifiers ) {
+ if (identifier == null) {
+ logger.error("containsUuidIdentifiersOnly(): identifier in identifiers list is null");
+ return false;
+ }
if ( !identifier.isUUID() ) {
return false;
}
@@ -635,6 +648,9 @@ public class Query {
if ( identifiers == null ) {
identifiers = new ArrayList<Identifier>();
}
+ if (identifier == null) {
+ throw new IllegalArgumentException("adding null identifier is not allowed");
+ }
identifiers.add( identifier );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
deleted file mode 100644
index a3c7284..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-import org.junit.runner.RunWith;
-import net.jcip.annotations.NotThreadSafe;
-import org.apache.usergrid.corepersistence.TestIndexModule;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.impl.EsRunner;
-
-@RunWith( EsRunner.class )
-@UseModules( { TestIndexModule.class } )
-@NotThreadSafe
-public class CollectionVersionTest {
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
index 84a28f0..70d2284 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
@@ -34,6 +34,7 @@ public class Identifier implements Serializable {
public static final String UUID_REX =
"[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}";
public static final String EMAIL_REX = "[a-zA-Z0-9._%'+\\-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}";
+ public static final String NAME_REX = "[a-zA-Z0-9_\\-./'+ ]*";
public enum Type {
UUID, NAME, EMAIL
@@ -46,7 +47,7 @@ public class Identifier implements Serializable {
static Pattern emailRegEx = Pattern.compile( EMAIL_REX );
// "Pattern nameRegEx" below used to be [a-zA-Z0-9_\\-./], changed it to contain a 'space' to a
// ddress https://issues.apache.org/jira/browse/USERGRID-94
- static Pattern nameRegEx = Pattern.compile( "[a-zA-Z0-9_\\-./'+ ]*" );
+ static Pattern nameRegEx = Pattern.compile( NAME_REX );
private Identifier( Type type, Object value ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index 0ab0661..86b3216 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -51,6 +51,8 @@ import org.apache.usergrid.services.ServicePayload;
import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
@@ -75,7 +77,7 @@ public class CollectionResource extends ServiceResource {
@POST
- @Path("{itemName}/clear")
+ @Path("{itemName}/_clear")
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
@RequireApplicationAccess
public ApiResponse executeClearCollection(
@@ -84,16 +86,16 @@ public class CollectionResource extends ServiceResource {
@QueryParam(CONFIRM_COLLECTION_NAME) String confirmCollectionName) throws Exception {
if (logger.isTraceEnabled()){
- logger.trace( "CollectionResource.executeDeleteOnCollection" );
+ logger.trace( "CollectionResource.executeClearCollection" );
}
- if (!Application.isCustomCollectionName(itemName.toString())) {
+ if (!Application.isCustomCollectionName(itemName.getPath())) {
throw new IllegalArgumentException(
"Cannot clear built-in collections (" + itemName + ")."
);
}
- if (!itemName.toString().equals(confirmCollectionName)) {
+ if (!itemName.getPath().equals(confirmCollectionName)) {
throw new IllegalArgumentException(
"Cannot delete collection without supplying correct collection name in query parameter " + CONFIRM_COLLECTION_NAME
);
@@ -103,7 +105,7 @@ public class CollectionResource extends ServiceResource {
UUID applicationId = getApplicationId();
- emf.getEntityManager(applicationId).deleteCollection(itemName.toString());
+ emf.getEntityManager(applicationId).clearCollection(itemName.getPath());
if (logger.isTraceEnabled()) {
logger.trace("CollectionResource.executeDeleteOnCollection() deleted, appId={} collection={}",
@@ -111,12 +113,54 @@ public class CollectionResource extends ServiceResource {
}
ApiResponse response = createApiResponse();
- response.setAction("delete");
+ response.setAction("post");
response.setApplication(emf.getEntityManager( applicationId ).getApplication());
response.setParams(ui.getQueryParameters());
if (logger.isTraceEnabled()) {
- logger.trace("CollectionResource.executeDeleteOnCollection() sending response");
+ logger.trace("CollectionResource.executeClearCollection() sending response");
+ }
+
+ return response;
+
+ }
+
+ @GET
+ @Path( "{itemName}/_version")
+ @Produces({MediaType.APPLICATION_JSON,"application/javascript"})
+ @RequireApplicationAccess
+ @JSONP
+ public ApiResponse executeGetCollectionVersion(
+ @Context UriInfo ui,
+ @PathParam("itemName") PathSegment itemName,
+ @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+ if (logger.isTraceEnabled()){
+ logger.trace( "CollectionResource.executeGetCollectionVersion" );
+ }
+
+ if (!Application.isCustomCollectionName(itemName.getPath())) {
+ throw new IllegalArgumentException(
+ "Built-in collections are not versioned."
+ );
+ }
+
+ addItemToServiceContext( ui, itemName );
+
+ UUID applicationId = getApplicationId();
+
+ String currentVersion = emf.getEntityManager(applicationId).getCollectionVersion(itemName.getPath());
+
+ ApiResponse response = createApiResponse();
+ response.setAction("get");
+ response.setApplication(emf.getEntityManager( applicationId ).getApplication());
+ Map<String,Object> data = new HashMap<>();
+ data.put("collectionName",itemName.getPath());
+ data.put("version",currentVersion);
+ response.setData(data);
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("CollectionResource.executeGetCollectionVersion() sending response");
}
return response;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
new file mode 100644
index 0000000..e40c193
--- /dev/null
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.rest.applications.collection;
+
+
+import org.apache.usergrid.rest.test.resource.AbstractRestIT;
+import org.apache.usergrid.rest.test.resource.model.ApiResponse;
+import org.apache.usergrid.rest.test.resource.model.Collection;
+import org.apache.usergrid.rest.test.resource.model.Entity;
+import org.apache.usergrid.rest.test.resource.model.QueryParameters;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests collection clear functionality.
+ */
+
+public class CollectionClearTest extends AbstractRestIT {
+
+ private static final Logger logger = LoggerFactory.getLogger(CollectionClearTest.class);
+
+
+ /**
+ * Tests collection clear functionality.
+ * @throws Exception
+ */
+ @Test
+ public void collectionDelete() throws Exception {
+
+ String collectionName = "children";
+ int numEntities = 10;
+ String namePrefix = "child";
+
+ int numEntitiesAfterClear = 5;
+ String namePrefixAfterClear = "abc";
+
+ // verify collection version is empty
+ ApiResponse tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
+ LinkedHashMap dataMap = (LinkedHashMap)tempResponse.getData();
+ assertEquals("", dataMap.get("version"));
+ assertEquals(collectionName, dataMap.get("collectionName"));
+
+ createEntities( collectionName, namePrefix, numEntities );
+
+ // retrieve entities, provide 1 more than num entities
+ QueryParameters parms = new QueryParameters().setLimit( numEntities + 1 ).setQuery("order by created asc");
+ List<Entity> entities = retrieveEntities(collectionName, namePrefix, parms, numEntities, false);
+ assertEquals(numEntities, entities.size());
+
+ // clear the collection
+ Map<String, Object> payload = new HashMap<String, Object>();
+ parms = new QueryParameters().setKeyValue("confirm_collection_name", collectionName);
+ tempResponse = this.app().collection(collectionName).collection("_clear").post(true, payload, parms);
+
+ // verify collection version has changed
+ tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
+ dataMap = (LinkedHashMap)tempResponse.getData();
+ String newVersion = (String)dataMap.get("version");
+ assertNotEquals("", newVersion);
+ assertEquals(collectionName, dataMap.get("collectionName"));
+
+ // validate that 0 entities left
+ List<Entity> entitiesAfterClear = retrieveEntities(collectionName, namePrefix, parms, 0, true);
+ assertEquals(0, entitiesAfterClear.size());
+
+ // insert more entities using same collectionName
+ createEntities( collectionName, namePrefixAfterClear, numEntitiesAfterClear );
+
+ // validate correct number of entities
+ parms = new QueryParameters().setLimit( numEntitiesAfterClear + 1 ).setQuery("order by created asc");
+ List<Entity> newEntities = retrieveEntities(collectionName, namePrefixAfterClear, parms, numEntitiesAfterClear, false);
+ assertEquals(numEntitiesAfterClear, newEntities.size());
+
+ // verify collection version has not changed
+ tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
+ dataMap = (LinkedHashMap)tempResponse.getData();
+ assertEquals(newVersion, dataMap.get("version"));
+ assertEquals(collectionName, dataMap.get("collectionName"));
+ }
+
+
+ /**
+ * Creates a number of entities with sequential names going up to the numOfEntities and posts them to the
+ * collection specified with collectionName.
+ * @param collectionName
+ * @param numOfEntities
+ */
+ public List<Entity> createEntities(String collectionName, String namePrefix, int numOfEntities ){
+ List<Entity> entities = new LinkedList<>( );
+
+ for ( int i = 1; i <= numOfEntities; i++ ) {
+ Map<String, Object> entityPayload = new HashMap<String, Object>();
+ entityPayload.put( "name", namePrefix + String.valueOf( i ) );
+ entityPayload.put( "num", i );
+
+ Entity entity = new Entity( entityPayload );
+
+ entities.add( entity );
+
+ this.app().collection( collectionName ).post( entity );
+
+ if ( i % 100 == 0){
+ logger.info("created {} entities", i);
+ }
+ }
+ logger.info("created {} total entities", numOfEntities);
+
+ this.waitForQueueDrainAndRefreshIndex();
+
+ return entities;
+ }
+
+ /**
+ * Retrieves a specified number of entities from a collection.
+ * @param collectionName
+ * @param parms
+ * @param numOfEntities
+ */
+ public List<Entity> retrieveEntities(String collectionName, String namePrefix, QueryParameters parms, int numOfEntities, boolean reverseOrder){
+ List<Entity> entities = new LinkedList<>( );
+ Collection testCollection = this.app().collection( collectionName ).get(parms, true);
+
+ int entityNum;
+ if (reverseOrder) {
+ entityNum = numOfEntities;
+ } else {
+ entityNum = 1;
+ }
+ while (testCollection.getCursor() != null) {
+ while (testCollection.hasNext()) {
+ Entity returnedEntity = testCollection.next();
+ assertEquals(namePrefix + String.valueOf(entityNum), returnedEntity.get("name"));
+ entities.add(returnedEntity);
+ if (reverseOrder) {
+ entityNum--;
+ } else {
+ entityNum++;
+ }
+ }
+
+ testCollection = this.app().collection(collectionName).getNextPage(testCollection, parms, true);
+ }
+
+ // handle left over entities
+ while (testCollection.hasNext()) {
+ Entity returnedEntity = testCollection.next();
+ assertEquals(namePrefix + String.valueOf(entityNum), returnedEntity.get("name"));
+ entities.add(returnedEntity);
+ if (reverseOrder) {
+ entityNum--;
+ } else {
+ entityNum++;
+ }
+ }
+
+ assertEquals(entities.size(), numOfEntities);
+ return entities;
+ }
+
+}