You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2017/07/17 22:11:24 UTC

[1/2] usergrid git commit: move deletes to new delete queue -- read repair will fix attempts to access deleted entities and connections, so indexing and collection deletes can proceed more slowly than other types of changes

Repository: usergrid
Updated Branches:
  refs/heads/collectionDelete 99ba349c8 -> b6d14069a


move deletes to new delete queue -- read repair will fix attempts to access deleted entities and connections, so indexing and collection deletes can proceed more slowly than other types of changes


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/221b99dd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/221b99dd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/221b99dd

Branch: refs/heads/collectionDelete
Commit: 221b99dd9d81968a5629647771a6eaf83c4177be
Parents: 99ba349
Author: Mike Dunker <md...@google.com>
Authored: Mon Jul 10 08:03:52 2017 -0700
Committer: Mike Dunker <md...@google.com>
Committed: Mon Jul 10 08:03:52 2017 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventQueueType.java        |  35 +++
 .../asyncevents/AsyncEventService.java          |   4 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 315 ++++++++-----------
 .../index/IndexProcessorFig.java                |  18 ++
 .../corepersistence/index/ReIndexAction.java    |   5 +-
 .../index/ReIndexServiceImpl.java               |   3 +-
 .../read/traverse/AbstractReadGraphFilter.java  |  11 +-
 .../AbstractReadReverseGraphFilter.java         |  11 +-
 .../index/CollectionVersionTest.java            |   9 +
 9 files changed, 220 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
new file mode 100644
index 0000000..4b91e17
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public enum AsyncEventQueueType {
+    REGULAR ("regular"), UTILITY("utility"), DELETE("delete");
+
+    private String displayName;
+    AsyncEventQueueType(String displayName) {
+        this.displayName = displayName;
+    }
+
+    @Override
+    public String toString() {
+        return displayName;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 5fe4295..7ce208f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -85,9 +85,9 @@ public interface AsyncEventService extends ReIndexAction {
     /**
      *
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
+    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType);
 
     /**
      * @param applicationScope

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 257e172..e33865e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -103,13 +103,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+    public static final String QUEUE_NAME_DELETE = "delete";
     public static final String DEAD_LETTER_SUFFIX = "_dead";
 
 
     private final LegacyQueueManager indexQueue;
     private final LegacyQueueManager utilityQueue;
+    private final LegacyQueueManager deleteQueue;
     private final LegacyQueueManager indexQueueDead;
     private final LegacyQueueManager utilityQueueDead;
+    private final LegacyQueueManager deleteQueueDead;
     private final IndexProcessorFig indexProcessorFig;
     private final LegacyQueueFig queueFig;
     private final CollectionVersionFig collectionVersionFig;
@@ -133,8 +136,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong counterUtility = new AtomicLong();
+    private final AtomicLong counterDelete = new AtomicLong();
     private final AtomicLong counterIndexDead = new AtomicLong();
     private final AtomicLong counterUtilityDead = new AtomicLong();
+    private final AtomicLong counterDeleteDead = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -177,16 +182,24 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         LegacyQueueScope utilityQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
 
+        LegacyQueueScope deleteQueueScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL);
+
         LegacyQueueScope indexQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
 
         LegacyQueueScope utilityQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
 
+        LegacyQueueScope deleteQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL, true);
+
         this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
         this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+        this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope);
         this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
         this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
+        this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -211,24 +224,73 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         start();
     }
 
+    private String getQueueName(AsyncEventQueueType queueType) {
+        switch (queueType) {
+            case REGULAR:
+                return QUEUE_NAME;
+
+            case UTILITY:
+                return QUEUE_NAME_UTILITY;
+
+            case DELETE:
+                return QUEUE_NAME_DELETE;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
+        return getQueue(queueType, false);
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? indexQueueDead : indexQueue;
+
+            case UTILITY:
+                return isDeadQueue ? utilityQueueDead : utilityQueue;
+
+            case DELETE:
+                return isDeadQueue ? deleteQueueDead : deleteQueue;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? counterIndexDead : counter;
+
+            case UTILITY:
+                return isDeadQueue ? counterUtilityDead : counterUtility;
+
+            case DELETE:
+                return isDeadQueue ? counterDeleteDead : counterDelete;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+
 
     /**
      * Offer the EntityIdScope to SQS
      */
     private void offer(final Serializable operation) {
-        offer(operation, false);
+        offer(operation, AsyncEventQueueType.REGULAR);
     }
 
-    private void offer(final Serializable operation, boolean forUtilityQueue) {
+    private void offer(final Serializable operation, AsyncEventQueueType queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            if (forUtilityQueue) {
-                this.indexQueue.sendMessageToLocalRegion(operation);
-            } else {
-                this.indexQueue.sendMessageToLocalRegion(operation);
-            }
+            getQueue(queueType).sendMessageToLocalRegion(operation);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -238,16 +300,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
+    private void offerTopic(final Serializable operation, AsyncEventQueueType queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            if (forUtilityQueue) {
-                this.utilityQueue.sendMessageToAllRegions(operation);
-            } else {
-                this.indexQueue.sendMessageToAllRegions(operation);
-            }
+            getQueue(queueType).sendMessageToAllRegions(operation);
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -258,15 +316,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void offerBatch(final List operations, boolean forUtilityQueue){
+    private void offerBatch(final List operations, AsyncEventQueueType queueType){
         final Timer.Context timer = this.writeTimer.time();
         try {
             //signal to SQS
-            if( forUtilityQueue ){
-                this.utilityQueue.sendMessages(operations);
-            }else{
-                this.indexQueue.sendMessages(operations);
-            }
+            getQueue(queueType).sendMessages(operations);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -274,78 +328,24 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-    private void offerBatchToUtilityQueue(final List operations){
-        try {
-            //signal to SQS
-            this.utilityQueue.sendMessages(operations);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        }
-    }
-
 
     /**
      * Take message
      */
-    private List<LegacyQueueMessage> take() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from SQS utility queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from index dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue) {
 
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+            return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class);
         }
         finally {
             //stop our timer
             timer.stop();
         }
     }
-
-    /**
-     * Take message from SQS utility dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
+        return take(queueType, false);
     }
 
 
@@ -376,42 +376,20 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-
-    /**
-     * calls the event handlers and returns a result with information on whether
-     * it needs to be ack'd and whether it needs to be indexed
-     * Ack message in SQS
-     */
-    public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueue.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+    public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType, boolean isDeadQueue) {
+        if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
+            // different functionality
+            ack(messages);
         }
-    }
-
-    /**
-     * ack messages in index dead letter queue
-     */
-    public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            indexQueueDead.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+        try {
+            getQueue(queueType, isDeadQueue).commitMessages( messages );
         }
-    }
-
-    /**
-     * ack messages in utility dead letter queue
-     */
-    public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueueDead.commitMessages( messages );
-        }catch(Exception e){
+        catch (Exception e) {
             throw new RuntimeException("Unable to ack messages", e);
         }
     }
 
+
     /**
      * calls the event handlers and returns a result with information on whether
      * it needs to be ack'd and whether it needs to be indexed
@@ -555,7 +533,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
 
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
     }
 
 
@@ -645,7 +623,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage  handleEdgeDelete(final LegacyQueueMessage message) {
@@ -678,9 +656,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     /**
      * Queue up an indexOperationMessage for multi region execution
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType) {
 
         // don't try to produce something with nothing
         if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -706,7 +684,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
 
-        offerTopic( elasticsearchIndexEvent, forUtilityQueue );
+        offerTopic( elasticsearchIndexEvent, queueType );
     }
 
     private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
@@ -782,7 +760,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
 
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId), markedVersion), false);
+            new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE);
 
     }
 
@@ -844,7 +822,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
-        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ,
+            AsyncEventQueueType.DELETE);
     }
 
     private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
@@ -883,7 +862,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
-        offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true);
+        offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
+            AsyncEventQueueType.DELETE);
     }
 
     private void handleCollectionDelete(final LegacyQueueMessage message) {
@@ -932,7 +912,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                 offer(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
                     new EntityIdScope(applicationScope, edgeScope.getEdge().getTargetNode()),false),
-                    true);
+                    AsyncEventQueueType.DELETE);
                 count.incrementAndGet();
             }).toBlocking().lastOrDefault(null);
 
@@ -940,7 +920,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         if (count.intValue() >= maxDeletes) {
             // requeue collection delete for next chunk of deletes
-            offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true);
+            offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
+                AsyncEventQueueType.DELETE);
         }
     }
 
@@ -965,29 +946,39 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public void start() {
         final int indexCount = indexProcessorFig.getWorkerCount();
         final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+        final int deleteCount = indexProcessorFig.getWorkerCountDelete();
         final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
         final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
+        final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter();
 
         for (int i = 0; i < indexCount; i++) {
-            startWorker(QUEUE_NAME);
+            startWorker(AsyncEventQueueType.REGULAR);
         }
 
         for (int i = 0; i < utilityCount; i++) {
-            startWorker(QUEUE_NAME_UTILITY);
+            startWorker(AsyncEventQueueType.UTILITY);
+        }
+
+        for (int i = 0; i < deleteCount; i++) {
+            startWorker(AsyncEventQueueType.DELETE);
         }
 
         if( indexQueue instanceof SNSQueueManagerImpl ) {
-            logger.info("Queue manager implementation supports dead letters, start dead letter queue worker.");
+            logger.info("Queue manager implementation supports dead letters, start dead letter queue workers.");
             for (int i = 0; i < indexDeadCount; i++) {
-                startDeadQueueWorker(QUEUE_NAME);
+                startDeadQueueWorker(AsyncEventQueueType.REGULAR);
+            }
+
+            for (int i = 0; i < utilityDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.UTILITY);
+            }
+
+            for (int i = 0; i < deleteDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.DELETE);
             }
         }else{
             logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue worker.");
         }
-
-        for (int i = 0; i < utilityDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME_UTILITY);
-        }
     }
 
 
@@ -1005,11 +996,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startWorker(final AsyncEventQueueType queueType) {
         synchronized (mutex) {
 
-            boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+            String type = getQueueName(queueType);
 
             Observable<List<LegacyQueueMessage>> consumer =
                 Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@@ -1017,20 +1007,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                     public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
                         //name our thread so it's easy to see
-                        long threadNum = isUtilityQueue ?
-                            counterUtility.incrementAndGet() : counter.incrementAndGet();
-                        Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+                        long threadNum = getCounter(queueType, false).incrementAndGet();
+                        Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum );
 
                         List<LegacyQueueMessage> drainList = null;
 
                         do {
                             try {
-                                if ( isUtilityQueue ){
-                                    drainList = takeFromUtilityQueue();
-                                }else{
-                                    drainList = take();
+                                drainList = take(queueType);
 
-                                }
                                 //emit our list in it's entity to hand off to a worker pool
                                 subscriber.onNext(drainList);
 
@@ -1086,7 +1071,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                     // submit the processed messages to index producer
                                     List<LegacyQueueMessage> messagesToAck =
-                                        submitToIndex( indexEventResults, isUtilityQueue );
+                                        submitToIndex( indexEventResults, queueType );
 
                                     if ( messagesToAck.size() < messages.size() ) {
                                         logger.warn(
@@ -1096,12 +1081,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                     // ack each message if making it to this point
                                     if( messagesToAck.size() > 0 ){
-
-                                        if ( isUtilityQueue ){
-                                            ackUtilityQueue( messagesToAck );
-                                        }else{
-                                            ack( messagesToAck );
-                                        }
+                                        ack(messagesToAck, queueType, false);
                                     }
 
                                     return messagesToAck;
@@ -1125,31 +1105,25 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startDeadQueueWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
+        String type = getQueueName(queueType);
         synchronized (mutex) {
 
-            boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
-
             Observable<List<LegacyQueueMessage>> consumer =
                     Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
                         @Override
                         public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
-                            long threadNum = isUtilityDeadQueue ?
-                                counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
-                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
+                            long threadNum = getCounter(queueType, true).incrementAndGet();
+                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type + "_" + threadNum );
 
                             List<LegacyQueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    if ( isUtilityDeadQueue ){
-                                        drainList = takeFromUtilityDeadQueue();
-                                    }else{
-                                        drainList = takeFromIndexDeadQueue();
-                                    }
+                                    drainList = take(queueType, true);
+
                                     //emit our list in it's entity to hand off to a worker pool
                                     subscriber.onNext(drainList);
 
@@ -1198,18 +1172,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                                  try {
                                                      // put the dead letter messages back in the appropriate queue
-                                                     LegacyQueueManager returnQueue = null;
-                                                     String queueType;
-                                                     if (isUtilityDeadQueue) {
-                                                         returnQueue = utilityQueue;
-                                                         queueType = "utility";
-                                                     } else {
-                                                         returnQueue = indexQueue;
-                                                         queueType = "index";
-                                                     }
+                                                     LegacyQueueManager returnQueue = getQueue(queueType, false);
+
                                                      List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
                                                      for (LegacyQueueMessage msg : successMessages) {
-                                                         logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                                         logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(), msg.getStringBody());
                                                      }
                                                      int unsuccessfulMessagesSize = messages.size() - successMessages.size();
                                                      if (unsuccessfulMessagesSize > 0) {
@@ -1226,16 +1193,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                                          for (LegacyQueueMessage msg : messages) {
                                                              String messageId = msg.getMessageId();
                                                              if (!successMessageIds.contains(messageId)) {
-                                                                 logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
+                                                                 logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(), messageId, msg.getStringBody());
                                                              }
                                                          }
                                                      }
 
-                                                     if (isUtilityDeadQueue) {
-                                                         ackUtilityDeadQueue(successMessages);
-                                                     } else {
-                                                         ackIndexDeadQueue(successMessages);
-                                                     }
+                                                     ack(successMessages, queueType, true);
 
                                                      return messages;
                                                  }
@@ -1261,7 +1224,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * Submit results to index and return the queue messages to be ack'd
      *
      */
-    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
+    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, AsyncEventQueueType queueType) {
 
         // if nothing came back then return empty list
         if(indexEventResults==null){
@@ -1288,7 +1251,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-       queueIndexOperationMessage(combined, forUtilityQueue);
+        queueIndexOperationMessage(combined, queueType);
 
         return queueMessages;
     }
@@ -1299,10 +1262,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
         queueIndexOperationMessage(
-            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
+            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR);
     }
 
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType) {
 
         final List<EntityIndexEvent> batch = new ArrayList<>();
         edges.forEach(e -> {
@@ -1315,7 +1278,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
 
-        offerBatch( batch, forUtilityQueue );
+        offerBatch( batch, queueType );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7eecf04..eb63056 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,10 +40,14 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE = "elasticsearch.worker_count_delete";
+
     String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = "elasticsearch.worker_count_delete_deadletter";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -105,6 +109,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtility();
 
     /**
+     * The number of worker threads used to read delete requests from the queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE)
+    int getWorkerCountDelete();
+
+    /**
      * The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
      */
     @Default("1")
@@ -119,6 +130,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtilityDeadLetter();
 
     /**
+     * The number of worker threads used to read dead messages from the delete dead letter queue and reload them into the delete queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER)
+    int getWorkerCountDeleteDeadLetter();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 2b3573e..d6bdd93 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ public interface ReIndexAction {
      * Index a batch list of entities.  Goes to the utility queue.
      * @param edges
      * @param updatedSince
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
+    void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..c7371b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince, true);
+                indexService.indexBatch(edgeScopes, modifiedSince, AsyncEventQueueType.UTILITY);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index e9aa6c8..b1b7f75 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -122,7 +123,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -132,7 +133,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -142,7 +143,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
                 }
 
 
@@ -227,13 +228,13 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1b662cc..c75545e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -122,7 +123,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -132,7 +133,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -142,7 +143,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -221,13 +222,13 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
index 0278708..a3c7284 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
@@ -19,5 +19,14 @@
 
 package org.apache.usergrid.corepersistence.index;
 
+import org.junit.runner.RunWith;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+@NotThreadSafe
 public class CollectionVersionTest {
 }


[2/2] usergrid git commit: Add collection version testing and new _version endpoint to retrieve the version for a collection.

Posted by md...@apache.org.
Add collection version testing and new _version endpoint to retrieve the version for a collection.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b6d14069
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b6d14069
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b6d14069

Branch: refs/heads/collectionDelete
Commit: b6d14069aa500530d5b86112efd732031e52b0e8
Parents: 221b99d
Author: Mike Dunker <md...@google.com>
Authored: Mon Jul 17 14:54:31 2017 -0700
Committer: Mike Dunker <md...@google.com>
Committed: Mon Jul 17 14:54:31 2017 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   2 +-
 .../corepersistence/CpEntityManager.java        |  17 +-
 .../corepersistence/CpEntityManagerFactory.java |   6 +-
 .../index/CollectionClearService.java           |  35 ++++
 .../index/CollectionClearServiceImpl.java       |  69 +++++++
 .../index/CollectionDeleteService.java          |  30 ----
 .../index/CollectionDeleteServiceImpl.java      |  57 ------
 .../index/CollectionVersionManagerImpl.java     |   4 +-
 .../usergrid/persistence/EntityManager.java     |   4 +-
 .../org/apache/usergrid/persistence/Query.java  |  18 +-
 .../index/CollectionVersionTest.java            |  32 ----
 .../persistence/index/query/Identifier.java     |   3 +-
 .../rest/applications/CollectionResource.java   |  58 +++++-
 .../collection/CollectionClearTest.java         | 179 +++++++++++++++++++
 14 files changed, 374 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 5515abd..6a93af5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -143,7 +143,7 @@ public class CoreModule extends AbstractModule {
 
         bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
 
-        bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class );
+        bind( CollectionClearService.class ).to( CollectionClearServiceImpl.class );
 
         bind( ExportService.class ).to( ExportServiceImpl.class );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index a76720d..e192939 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -158,7 +158,7 @@ public class CpEntityManager implements EntityManager {
     private EntityCollectionManager ecm;
 
     public QueueManagerFactory queueManagerFactory;
-    private CollectionDeleteService collectionDeleteService;
+    private CollectionClearService collectionClearService;
     private CollectionVersionManagerFactory collectionVersionManagerFactory;
 
 
@@ -187,7 +187,7 @@ public class CpEntityManager implements EntityManager {
                             final CollectionSettingsFactory collectionSettingsFactory,
                             final UUID applicationId,
                             final QueueManagerFactory queueManagerFactory,
-                            final CollectionDeleteService collectionDeleteService,
+                            final CollectionClearService collectionClearService,
                             final CollectionVersionManagerFactory collectionVersionManagerFactory) {
 
         this.entityManagerFig = entityManagerFig;
@@ -255,7 +255,7 @@ public class CpEntityManager implements EntityManager {
         this.skipAggregateCounters = false;
 
         this.queueManagerFactory = queueManagerFactory;
-        this.collectionDeleteService = collectionDeleteService;
+        this.collectionClearService = collectionClearService;
         this.collectionVersionManagerFactory = collectionVersionManagerFactory;
     }
 
@@ -1890,9 +1890,16 @@ public class CpEntityManager implements EntityManager {
     }
 
     @Override
-    public void deleteCollection( String collectionName ){
+    public void clearCollection(String collectionName ){
 
-        collectionDeleteService.deleteCollection(applicationId, collectionName);
+        collectionClearService.clearCollection(applicationId, collectionName);
+
+    }
+
+    @Override
+    public String getCollectionVersion(String collectionName ){
+
+        return collectionClearService.getCollectionVersion(applicationId, collectionName);
 
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index b3dac57..2e3b180 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -117,7 +117,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private final CollectionSettingsFactory collectionSettingsFactory;
     private ActorSystemManager actorSystemManager;
     private final LockManager lockManager;
-    private final CollectionDeleteService collectionDeleteService;
+    private final CollectionClearService collectionClearService;
     private final CollectionVersionManagerFactory collectionVersionManagerFactory;
 
     private final QueueManagerFactory queueManagerFactory;
@@ -143,7 +143,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         this.collectionService          = injector.getInstance( CollectionService.class );
         this.connectionService          = injector.getInstance( ConnectionService.class );
         this.collectionSettingsFactory  = injector.getInstance( CollectionSettingsFactory.class );
-        this.collectionDeleteService    = injector.getInstance( CollectionDeleteService.class );
+        this.collectionClearService     = injector.getInstance( CollectionClearService.class );
         this.collectionVersionManagerFactory = injector.getInstance( CollectionVersionManagerFactory.class );
 
         Properties properties = cassandraService.getProperties();
@@ -395,7 +395,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             collectionSettingsFactory,
             applicationId,
             queueManagerFactory,
-            collectionDeleteService,
+            collectionClearService,
             collectionVersionManagerFactory);
 
         return em;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java
new file mode 100644
index 0000000..76380e5
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import java.util.UUID;
+
+public interface CollectionClearService {
+
+    /**
+     * Clear the entities from the current version of a collection by changing the collection version and queueing up a delete of the old entities
+     */
+    void clearCollection(final UUID applicationID, final String baseCollectionName);
+
+    /**
+     * Get the current version of a collection
+     */
+    String getCollectionVersion(final UUID applicationID, final String baseCollectionName);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
new file mode 100644
index 0000000..ff64d6a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+public class CollectionClearServiceImpl implements CollectionClearService {
+    private static final Logger logger = LoggerFactory.getLogger(CollectionClearServiceImpl.class );
+
+    private CollectionVersionManagerFactory collectionVersionManagerFactory;
+    private AsyncEventService asyncEventService;
+
+    @Inject
+    public CollectionClearServiceImpl(
+        final CollectionVersionManagerFactory collectionVersionManagerFactory,
+        final AsyncEventService asyncEventService
+    )
+    {
+        this.collectionVersionManagerFactory = collectionVersionManagerFactory;
+        this.asyncEventService = asyncEventService;
+    }
+
+    @Override
+    public void clearCollection(final UUID applicationID, final String baseCollectionName) {
+        CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName);
+        CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope);
+
+        // change version
+        String oldVersion = collectionVersionManager.updateCollectionVersion();
+        logger.info("Collection cleared: appID:{} baseCollectionName:{} oldVersion:{} newVersion:{}",
+            applicationID.toString(), baseCollectionName, oldVersion, collectionVersionManager.getCollectionVersion(false));
+
+        // queue up delete of old version entities
+        asyncEventService.queueCollectionDelete(scope, oldVersion);
+    }
+
+    @Override
+    public String getCollectionVersion(UUID applicationID, String baseCollectionName) {
+        CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName);
+        CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope);
+
+        String currentVersion = collectionVersionManager.getCollectionVersion(true);
+
+        return currentVersion;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
deleted file mode 100644
index 85b8fed..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-import java.util.UUID;
-
-public interface CollectionDeleteService {
-
-    /**
-     * Delete the current version of a collection by changing the collection version and queueing up a delete of the old entities
-     */
-    void deleteCollection(final UUID applicationID, final String baseCollectionName);
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
deleted file mode 100644
index 5c64079..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-
-public class CollectionDeleteServiceImpl implements CollectionDeleteService {
-    private static final Logger logger = LoggerFactory.getLogger(CollectionDeleteServiceImpl.class );
-
-    private CollectionVersionManagerFactory collectionVersionManagerFactory;
-    private AsyncEventService asyncEventService;
-
-    @Inject
-    public CollectionDeleteServiceImpl(
-        final CollectionVersionManagerFactory collectionVersionManagerFactory,
-        final AsyncEventService asyncEventService
-    )
-    {
-        this.collectionVersionManagerFactory = collectionVersionManagerFactory;
-        this.asyncEventService = asyncEventService;
-    }
-
-    @Override
-    public void deleteCollection(final UUID applicationID, final String baseCollectionName) {
-        CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName);
-        CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope);
-
-        // change version
-        String oldVersion = collectionVersionManager.updateCollectionVersion();
-
-        // queue up delete of old version entities
-        asyncEventService.queueCollectionDelete(scope, oldVersion);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
index 7ed557c..c5bb417 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
@@ -95,7 +95,7 @@ public class CollectionVersionManagerImpl implements CollectionVersionManager {
         }
 
         String oldCollectionVersion = getCollectionVersion(true);
-        String newCollectionVersion = getNewCollectionVersion();
+        String newCollectionVersion = generateNewCollectionVersion();
         mapManager.putLong(MAP_PREFIX_LAST_CHANGED+collectionName, System.currentTimeMillis());
         mapManager.putString(MAP_PREFIX_VERSION+collectionName, newCollectionVersion);
         cache.put(scope, newCollectionVersion);
@@ -104,7 +104,7 @@ public class CollectionVersionManagerImpl implements CollectionVersionManager {
         return oldCollectionVersion;
     }
 
-    private static String getNewCollectionVersion() {
+    private static String generateNewCollectionVersion() {
         return UUIDGenerator.newTimeUUID().toString();
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index a977f31..f55263d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -530,7 +530,9 @@ public interface EntityManager {
 
     Object getCollectionSettings( String collectionName );
 
-    void deleteCollection( String collectionName );
+    void clearCollection(String collectionName);
+
+    String getCollectionVersion(String collectionName);
 
     public void grantRolePermission( String roleName, String permission ) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 900bda5..5662f06 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.index.query.tree.Operand;
 import org.apache.usergrid.persistence.index.utils.ClassUtils;
 import org.apache.usergrid.persistence.index.utils.ListUtils;
 import org.apache.usergrid.persistence.index.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -41,7 +43,7 @@ import java.util.Map.Entry;
 
 
 public class Query {
-
+    private static final Logger logger = LoggerFactory.getLogger(Query.class);
 
 
     public enum Level {
@@ -319,6 +321,13 @@ public class Query {
 
 
     public static Query fromIdentifier( Object id ) {
+        if (id == null) {
+            throw new IllegalArgumentException("null identifier passed in");
+        }
+        Identifier objectIdentifier = Identifier.from(id);
+        if (objectIdentifier == null) {
+            throw new IllegalArgumentException("Supplied id results in null Identifier");
+        }
         Query q = new Query();
         q.addIdentifier( Identifier.from(id) );
         return q;
@@ -409,6 +418,10 @@ public class Query {
         }
 
         for ( Identifier identifier : identifiers ) {
+            if (identifier == null) {
+                logger.error("containsUuidIdentifiersOnly(): identifier in identifiers list is null");
+                return false;
+            }
             if ( !identifier.isUUID() ) {
                 return false;
             }
@@ -635,6 +648,9 @@ public class Query {
         if ( identifiers == null ) {
             identifiers = new ArrayList<Identifier>();
         }
+        if (identifier == null) {
+            throw new IllegalArgumentException("adding null identifier is not allowed");
+        }
         identifiers.add( identifier );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
deleted file mode 100644
index a3c7284..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-import org.junit.runner.RunWith;
-import net.jcip.annotations.NotThreadSafe;
-import org.apache.usergrid.corepersistence.TestIndexModule;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.impl.EsRunner;
-
-@RunWith( EsRunner.class )
-@UseModules( { TestIndexModule.class } )
-@NotThreadSafe
-public class CollectionVersionTest {
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
index 84a28f0..70d2284 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
@@ -34,6 +34,7 @@ public class Identifier implements Serializable {
     public static final String UUID_REX =
             "[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}";
     public static final String EMAIL_REX =  "[a-zA-Z0-9._%'+\\-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}";
+    public static final String NAME_REX = "[a-zA-Z0-9_\\-./'+ ]*";
 
     public enum Type {
         UUID, NAME, EMAIL
@@ -46,7 +47,7 @@ public class Identifier implements Serializable {
     static Pattern emailRegEx = Pattern.compile( EMAIL_REX );
     // "Pattern nameRegEx" below used to be [a-zA-Z0-9_\\-./], changed it to contain a 'space' to a
     // ddress https://issues.apache.org/jira/browse/USERGRID-94
-    static Pattern nameRegEx = Pattern.compile( "[a-zA-Z0-9_\\-./'+ ]*" );
+    static Pattern nameRegEx = Pattern.compile( NAME_REX );
 
 
     private Identifier( Type type, Object value ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index 0ab0661..86b3216 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -51,6 +51,8 @@ import org.apache.usergrid.services.ServicePayload;
 
 import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 
@@ -75,7 +77,7 @@ public class CollectionResource extends ServiceResource {
 
 
     @POST
-    @Path("{itemName}/clear")
+    @Path("{itemName}/_clear")
     @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
     @RequireApplicationAccess
     public ApiResponse executeClearCollection(
@@ -84,16 +86,16 @@ public class CollectionResource extends ServiceResource {
         @QueryParam(CONFIRM_COLLECTION_NAME) String confirmCollectionName) throws Exception {
 
         if (logger.isTraceEnabled()){
-            logger.trace( "CollectionResource.executeDeleteOnCollection" );
+            logger.trace( "CollectionResource.executeClearCollection" );
         }
 
-        if (!Application.isCustomCollectionName(itemName.toString())) {
+        if (!Application.isCustomCollectionName(itemName.getPath())) {
             throw new IllegalArgumentException(
                 "Cannot clear built-in collections (" + itemName + ")."
             );
         }
 
-        if (!itemName.toString().equals(confirmCollectionName)) {
+        if (!itemName.getPath().equals(confirmCollectionName)) {
             throw new IllegalArgumentException(
                 "Cannot delete collection without supplying correct collection name in query parameter " + CONFIRM_COLLECTION_NAME
             );
@@ -103,7 +105,7 @@ public class CollectionResource extends ServiceResource {
 
         UUID applicationId = getApplicationId();
 
-        emf.getEntityManager(applicationId).deleteCollection(itemName.toString());
+        emf.getEntityManager(applicationId).clearCollection(itemName.getPath());
 
         if (logger.isTraceEnabled()) {
             logger.trace("CollectionResource.executeDeleteOnCollection() deleted, appId={} collection={}",
@@ -111,12 +113,54 @@ public class CollectionResource extends ServiceResource {
         }
 
         ApiResponse response = createApiResponse();
-        response.setAction("delete");
+        response.setAction("post");
         response.setApplication(emf.getEntityManager( applicationId ).getApplication());
         response.setParams(ui.getQueryParameters());
 
         if (logger.isTraceEnabled()) {
-            logger.trace("CollectionResource.executeDeleteOnCollection() sending response");
+            logger.trace("CollectionResource.executeClearCollection() sending response");
+        }
+
+        return response;
+
+    }
+
+    @GET
+    @Path( "{itemName}/_version")
+    @Produces({MediaType.APPLICATION_JSON,"application/javascript"})
+    @RequireApplicationAccess
+    @JSONP
+    public ApiResponse executeGetCollectionVersion(
+        @Context UriInfo ui,
+        @PathParam("itemName") PathSegment itemName,
+        @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+        if (logger.isTraceEnabled()){
+            logger.trace( "CollectionResource.executeGetCollectionVersion" );
+        }
+
+        if (!Application.isCustomCollectionName(itemName.getPath())) {
+            throw new IllegalArgumentException(
+                "Built-in collections are not versioned."
+            );
+        }
+
+        addItemToServiceContext( ui, itemName );
+
+        UUID applicationId = getApplicationId();
+
+        String currentVersion = emf.getEntityManager(applicationId).getCollectionVersion(itemName.getPath());
+
+        ApiResponse response = createApiResponse();
+        response.setAction("get");
+        response.setApplication(emf.getEntityManager( applicationId ).getApplication());
+        Map<String,Object> data = new HashMap<>();
+        data.put("collectionName",itemName.getPath());
+        data.put("version",currentVersion);
+        response.setData(data);
+
+        if (logger.isTraceEnabled()) {
+            logger.trace("CollectionResource.executeGetCollectionVersion() sending response");
         }
 
         return response;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b6d14069/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
new file mode 100644
index 0000000..e40c193
--- /dev/null
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.rest.applications.collection;
+
+
+import org.apache.usergrid.rest.test.resource.AbstractRestIT;
+import org.apache.usergrid.rest.test.resource.model.ApiResponse;
+import org.apache.usergrid.rest.test.resource.model.Collection;
+import org.apache.usergrid.rest.test.resource.model.Entity;
+import org.apache.usergrid.rest.test.resource.model.QueryParameters;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests collection clear functionality.
+ */
+
+public class CollectionClearTest extends AbstractRestIT {
+
+    private static final Logger logger = LoggerFactory.getLogger(CollectionClearTest.class);
+
+
+    /**
+     * Tests collection clear functionality.
+     * @throws Exception
+     */
+    @Test
+    public void collectionDelete() throws Exception {
+
+        String collectionName = "children";
+        int numEntities = 10;
+        String namePrefix = "child";
+
+        int numEntitiesAfterClear = 5;
+        String namePrefixAfterClear = "abc";
+
+        // verify collection version is empty
+        ApiResponse tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
+        LinkedHashMap dataMap = (LinkedHashMap)tempResponse.getData();
+        assertEquals("", dataMap.get("version"));
+        assertEquals(collectionName, dataMap.get("collectionName"));
+
+        createEntities( collectionName, namePrefix, numEntities );
+
+        // retrieve entities, provide 1 more than num entities
+        QueryParameters parms = new QueryParameters().setLimit( numEntities + 1 ).setQuery("order by created asc");
+        List<Entity> entities = retrieveEntities(collectionName, namePrefix, parms, numEntities, false);
+        assertEquals(numEntities, entities.size());
+
+        // clear the collection
+        Map<String, Object> payload = new HashMap<String, Object>();
+        parms = new QueryParameters().setKeyValue("confirm_collection_name", collectionName);
+        tempResponse = this.app().collection(collectionName).collection("_clear").post(true, payload, parms);
+
+        // verify collection version has changed
+        tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
+        dataMap = (LinkedHashMap)tempResponse.getData();
+        String newVersion = (String)dataMap.get("version");
+        assertNotEquals("", newVersion);
+        assertEquals(collectionName, dataMap.get("collectionName"));
+
+        // validate that 0 entities left
+        List<Entity> entitiesAfterClear = retrieveEntities(collectionName, namePrefix, parms, 0, true);
+        assertEquals(0, entitiesAfterClear.size());
+
+        // insert more entities using same collectionName
+        createEntities( collectionName, namePrefixAfterClear, numEntitiesAfterClear );
+
+        // validate correct number of entities
+        parms = new QueryParameters().setLimit( numEntitiesAfterClear + 1 ).setQuery("order by created asc");
+        List<Entity> newEntities = retrieveEntities(collectionName, namePrefixAfterClear, parms, numEntitiesAfterClear, false);
+        assertEquals(numEntitiesAfterClear, newEntities.size());
+
+        // verify collection version has not changed
+        tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
+        dataMap = (LinkedHashMap)tempResponse.getData();
+        assertEquals(newVersion, dataMap.get("version"));
+        assertEquals(collectionName, dataMap.get("collectionName"));
+    }
+
+
+    /**
+     * Creates a number of entities with sequential names going up to the numOfEntities and posts them to the
+     * collection specified with collectionName.
+     * @param collectionName
+     * @param numOfEntities
+     */
+    public List<Entity> createEntities(String collectionName, String namePrefix, int numOfEntities ){
+        List<Entity> entities = new LinkedList<>(  );
+
+        for ( int i = 1; i <= numOfEntities; i++ ) {
+            Map<String, Object> entityPayload = new HashMap<String, Object>();
+            entityPayload.put( "name", namePrefix + String.valueOf( i ) );
+            entityPayload.put( "num", i );
+
+            Entity entity = new Entity( entityPayload );
+
+            entities.add( entity );
+
+            this.app().collection( collectionName ).post( entity );
+
+            if ( i % 100 == 0){
+                logger.info("created {} entities", i);
+            }
+        }
+        logger.info("created {} total entities", numOfEntities);
+
+        this.waitForQueueDrainAndRefreshIndex();
+
+        return entities;
+    }
+
+    /**
+     * Retrieves a specified number of entities from a collection.
+     * @param collectionName
+     * @param parms
+     * @param numOfEntities
+     */
+    public List<Entity> retrieveEntities(String collectionName, String namePrefix, QueryParameters parms, int numOfEntities, boolean reverseOrder){
+        List<Entity> entities = new LinkedList<>(  );
+        Collection testCollection = this.app().collection( collectionName ).get(parms, true);
+
+        int entityNum;
+        if (reverseOrder) {
+            entityNum = numOfEntities;
+        } else {
+            entityNum = 1;
+        }
+        while (testCollection.getCursor() != null) {
+            while (testCollection.hasNext()) {
+                Entity returnedEntity = testCollection.next();
+                assertEquals(namePrefix + String.valueOf(entityNum), returnedEntity.get("name"));
+                entities.add(returnedEntity);
+                if (reverseOrder) {
+                    entityNum--;
+                } else {
+                    entityNum++;
+                }
+            }
+
+            testCollection = this.app().collection(collectionName).getNextPage(testCollection, parms, true);
+        }
+
+        // handle left over entities
+        while (testCollection.hasNext()) {
+            Entity returnedEntity = testCollection.next();
+            assertEquals(namePrefix + String.valueOf(entityNum), returnedEntity.get("name"));
+            entities.add(returnedEntity);
+            if (reverseOrder) {
+                entityNum--;
+            } else {
+                entityNum++;
+            }
+        }
+
+        assertEquals(entities.size(), numOfEntities);
+        return entities;
+    }
+
+}