You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2017/10/18 16:21:50 UTC

[08/51] [abbrv] usergrid git commit: dead letter queue handling 1. Add workers that move messages from dead letter queues back to the indexing and utility queues. 2. Change ERROR for DuplicateUniquePropertyExistsException to INFO. This happens when someo

dead letter queue handling
1. Add workers that move messages from dead letter queues back to the indexing and utility queues.
2. Change ERROR for DuplicateUniquePropertyExistsException to INFO. This happens when someone tries to create an entity with the same unique value as another entity, which is not an error that should be logged.
3. Add better logging when cluster region or region list has an invalid region.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/59354608
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/59354608
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/59354608

Branch: refs/heads/asf-site
Commit: 5935460829622ceee91d72ff8a3b44529875ae29
Parents: 3f819dc
Author: Mike Dunker <md...@google.com>
Authored: Wed Jun 14 16:00:13 2017 -0700
Committer: Mike Dunker <md...@google.com>
Committed: Wed Jun 14 16:00:13 2017 -0700

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  26 +-
 .../usergrid/corepersistence/CoreModule.java    |   1 +
 .../asyncevents/AsyncEventServiceImpl.java      | 283 +++++++++++++++----
 .../index/IndexProcessorFig.java                |  30 +-
 .../persistence/queue/LegacyQueueFig.java       |   7 +-
 .../persistence/queue/LegacyQueueManager.java   |   9 +
 .../persistence/queue/LegacyQueueScope.java     |   5 +
 .../persistence/queue/LocalQueueManager.java    |  20 +-
 .../persistence/queue/guice/QueueModule.java    |   4 +
 .../queue/impl/LegacyQueueScopeImpl.java        |  25 +-
 .../queue/impl/QakkaQueueManager.java           |  13 +
 .../queue/impl/SNSQueueManagerImpl.java         |  97 +++++--
 .../services/AbstractCollectionService.java     |   9 +
 .../services/queues/ImportQueueManager.java     |   8 +
 14 files changed, 452 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 0d1a193..d505a96 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -305,10 +305,15 @@ cassandra.lock.writecl=LOCAL_QUORUM
 #
 #elasticsearch.refresh_search_max=10
 
-# Set the amount of time to wait when Elasticsearch rejects a requests before
+# Set the amount of time to wait when indexing or utility queue rejects a request before
 # retrying.  This provides simple backpressure. (in milliseconds)
 #
-#elasticsearch.rejected_retry_wait
+#elasticsearch.rejected_retry_wait=1000
+
+# Set the amount of time to wait when indexing or utility dead letter queue rejects a request before
+# retrying.  This provides simple backpressure. (in milliseconds)
+#
+#elasticsearch.deadletter.rejected_retry_wait=2000
 
 
 
@@ -332,18 +337,29 @@ cassandra.lock.writecl=LOCAL_QUORUM
 #
 #usergrid.use.default.queue=false
 
-# The number of worker threads used to read index write requests from the queue.
+# The number of worker threads used to read index write requests from the indexing queue.
 #
 #elasticsearch.worker_count=8
 
+# The number of worker threads used to read index write requests from the utility queue.
+#
+#elasticsearch.worker_count_utility=2
+
+# The number of worker threads used to read dead letter messages from the indexing dead letter queue.
+#
+#elasticsearch.worker_count_deadletter=1
+
+# The number of worker threads used to read dead letter messages from the utility dead letter queue.
+#
+#elasticsearch.worker_count_utility_deadletter=1
+
 # Set the number of worker threads used for processing index write requests to
 # Elasticsearch from the buffer.
 #
 #index.flush.workers=10
 
 # Set the implementation to use for queuing in Usergrid.
-# Valid values: TEST, LOCAL, SQS, SNS
-# NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
+# Valid values: LOCAL, DISTRIBUTED, DISTRIBUTED_SNS
 #
 #elasticsearch.queue_impl=LOCAL
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index af297f2..d4cf8cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -44,6 +44,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 75d2ce0..7c33969 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -74,6 +74,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.commons.lang.StringUtils.indexOf;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 
 
@@ -103,10 +104,13 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+    public static final String DEAD_LETTER_SUFFIX = "_dead";
 
 
-    private final LegacyQueueManager queue;
+    private final LegacyQueueManager indexQueue;
     private final LegacyQueueManager utilityQueue;
+    private final LegacyQueueManager indexQueueDead;
+    private final LegacyQueueManager utilityQueueDead;
     private final IndexProcessorFig indexProcessorFig;
     private final LegacyQueueFig queueFig;
     private final IndexProducer indexProducer;
@@ -128,6 +132,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong counterUtility = new AtomicLong();
+    private final AtomicLong counterIndexDead = new AtomicLong();
+    private final AtomicLong counterUtilityDead = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -162,14 +168,22 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         this.rxTaskScheduler = rxTaskScheduler;
 
-        LegacyQueueScope queueScope =
+        LegacyQueueScope indexQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
 
         LegacyQueueScope utilityQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
 
-        this.queue = queueManagerFactory.getQueueManager(queueScope);
+        LegacyQueueScope indexQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
+
+        LegacyQueueScope utilityQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
+
+        this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
         this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+        this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
+        this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -201,7 +215,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         try {
             //signal to SQS
-            this.queue.sendMessageToLocalRegion( operation );
+            this.indexQueue.sendMessageToLocalRegion( operation );
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -218,7 +232,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             if (forUtilityQueue) {
                 this.utilityQueue.sendMessageToAllRegions(operation);
             } else {
-                this.queue.sendMessageToAllRegions(operation);
+                this.indexQueue.sendMessageToAllRegions(operation);
             }
         }
         catch ( IOException e ) {
@@ -237,7 +251,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             if( forUtilityQueue ){
                 this.utilityQueue.sendMessages(operations);
             }else{
-                this.queue.sendMessages(operations);
+                this.indexQueue.sendMessages(operations);
             }
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
@@ -264,7 +278,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return queue.getMessages(MAX_TAKE, AsyncEvent.class);
+            return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
         }
         finally {
             //stop our timer
@@ -288,6 +302,38 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
+    /**
+     * Take message from index dead letter queue
+     */
+    private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+        }
+        finally {
+            //stop our timer
+            timer.stop();
+        }
+    }
+
+    /**
+     * Take message from SQS utility dead letter queue
+     */
+    private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+        }
+        finally {
+            //stop our timer
+            timer.stop();
+        }
+    }
+
 
     /**
      * Ack message
@@ -300,7 +346,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
             for ( LegacyQueueMessage legacyQueueMessage : messages ) {
                 try {
-                    queue.commitMessage( legacyQueueMessage );
+                    indexQueue.commitMessage( legacyQueueMessage );
                     inFlight.decrementAndGet();
 
                 } catch ( Throwable t ) {
@@ -331,6 +377,28 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
     /**
+     * ack messages in index dead letter queue
+     */
+    public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
+        try{
+            indexQueueDead.commitMessages( messages );
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }
+    }
+
+    /**
+     * ack messages in utility dead letter queue
+     */
+    public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
+        try{
+            utilityQueueDead.commitMessages( messages );
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }
+    }
+
+    /**
      * calls the event handlers and returns a result with information on whether
      * it needs to be ack'd and whether it needs to be indexed
      * @param messages
@@ -744,7 +812,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     @Override
     public long getQueueDepth() {
-        return queue.getQueueDepth();
+        return indexQueue.getQueueDepth();
     }
 
     @Override
@@ -806,16 +874,26 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * Loop through and start the workers
      */
     public void start() {
-        final int count = indexProcessorFig.getWorkerCount();
+        final int indexCount = indexProcessorFig.getWorkerCount();
         final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+        final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
+        final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
 
-        for (int i = 0; i < count; i++) {
+        for (int i = 0; i < indexCount; i++) {
             startWorker(QUEUE_NAME);
         }
 
         for (int i = 0; i < utilityCount; i++) {
             startWorker(QUEUE_NAME_UTILITY);
         }
+
+        for (int i = 0; i < indexDeadCount; i++) {
+            startDeadQueueWorker(QUEUE_NAME);
+        }
+
+        for (int i = 0; i < utilityDeadCount; i++) {
+            startDeadQueueWorker(QUEUE_NAME_UTILITY);
+        }
     }
 
 
@@ -840,47 +918,166 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
 
             Observable<List<LegacyQueueMessage>> consumer =
+                Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
+                    @Override
+                    public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
+
+                        //name our thread so it's easy to see
+                        long threadNum = isUtilityQueue ?
+                            counterUtility.incrementAndGet() : counter.incrementAndGet();
+                        Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+
+                        List<LegacyQueueMessage> drainList = null;
+
+                        do {
+                            try {
+                                if ( isUtilityQueue ){
+                                    drainList = takeFromUtilityQueue();
+                                }else{
+                                    drainList = take();
+
+                                }
+                                //emit our list in it's entity to hand off to a worker pool
+                                subscriber.onNext(drainList);
+
+                                //take since  we're in flight
+                                inFlight.addAndGet( drainList.size() );
+
+                            } catch ( Throwable t ) {
+
+                                final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+                                // there might be an error here during tests, just clean the cache
+                                indexQueue.clearQueueNameCache();
+
+                                if ( t instanceof InvalidQueryException ) {
+
+                                    // don't fill up log with exceptions when keyspace and column
+                                    // families are not ready during bootstrap/setup
+                                    logger.warn( "Failed to dequeue due to '{}'. Sleeping for {} ms",
+                                        t.getMessage(), sleepTime );
+
+                                } else {
+                                    logger.error( "Failed to dequeue. Sleeping for {} ms", sleepTime, t);
+                                }
+
+                                if ( drainList != null ) {
+                                    inFlight.addAndGet( -1 * drainList.size() );
+                                }
+
+                                try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
+
+                                indexErrorCounter.inc();
+                            }
+                        }
+                        while ( true );
+                    }
+                } )        //this won't block our read loop, just reads and proceeds
+                    .flatMap( sqsMessages -> {
+
+                        //do this on a different schedule, and introduce concurrency
+                        // with flatmap for faster processing
+                        return Observable.just( sqsMessages )
+
+                            .map( messages -> {
+                                if ( messages == null || messages.size() == 0 ) {
+                                    // no messages came from the queue, move on
+                                    return null;
+                                }
+
+                                try {
+                                    // process the messages
+                                    List<IndexEventResult> indexEventResults =
+                                        callEventHandlers( messages );
+
+                                    // submit the processed messages to index producer
+                                    List<LegacyQueueMessage> messagesToAck =
+                                        submitToIndex( indexEventResults, isUtilityQueue );
+
+                                    if ( messagesToAck.size() < messages.size() ) {
+                                        logger.warn(
+                                            "Missing {} message(s) from index processing",
+                                            messages.size() - messagesToAck.size() );
+                                    }
+
+                                    // ack each message if making it to this point
+                                    if( messagesToAck.size() > 0 ){
+
+                                        if ( isUtilityQueue ){
+                                            ackUtilityQueue( messagesToAck );
+                                        }else{
+                                            ack( messagesToAck );
+                                        }
+                                    }
+
+                                    return messagesToAck;
+                                }
+                                catch ( Exception e ) {
+                                    logger.error( "Failed to ack messages", e );
+                                    return null;
+                                    //do not rethrow so we can process all of them
+                                }
+                            } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+
+                        //end flatMap
+                    }, indexProcessorFig.getEventConcurrencyFactor() );
+
+            //start in the background
+
+            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
+
+            subscriptions.add(subscription);
+        }
+    }
+
+
+    private void startDeadQueueWorker(final String type) {
+        Preconditions.checkNotNull(type, "Worker type required");
+        synchronized (mutex) {
+
+            boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+
+            Observable<List<LegacyQueueMessage>> consumer =
                     Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
                         @Override
                         public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
-                            long threadNum = isUtilityQueue ?
-                                counterUtility.incrementAndGet() : counter.incrementAndGet();
-                            Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+                            long threadNum = isUtilityDeadQueue ?
+                                counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
+                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
 
                             List<LegacyQueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    if ( isUtilityQueue ){
-                                        drainList = takeFromUtilityQueue();
+                                    if ( isUtilityDeadQueue ){
+                                        drainList = takeFromUtilityDeadQueue();
                                     }else{
-                                        drainList = take();
-
+                                        drainList = takeFromIndexDeadQueue();
                                     }
                                     //emit our list in it's entity to hand off to a worker pool
-                                        subscriber.onNext(drainList);
+                                    subscriber.onNext(drainList);
 
                                     //take since  we're in flight
                                     inFlight.addAndGet( drainList.size() );
 
                                 } catch ( Throwable t ) {
 
-                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();
+                                    final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
 
                                     // there might be an error here during tests, just clean the cache
-                                    queue.clearQueueNameCache();
+                                    indexQueueDead.clearQueueNameCache();
 
                                     if ( t instanceof InvalidQueryException ) {
 
                                         // don't fill up log with exceptions when keyspace and column
                                         // families are not ready during bootstrap/setup
-                                        logger.warn( "Failed to dequeue due to '{}'. Sleeping for {} ms",
+                                        logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
                                             t.getMessage(), sleepTime );
 
                                     } else {
-                                        logger.error( "Failed to dequeue. Sleeping for {} ms", sleepTime, t);
+                                        logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
                                     }
 
                                     if ( drainList != null ) {
@@ -888,8 +1085,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                     }
 
                                     try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
-
-                                    indexErrorCounter.inc();
                                 }
                             }
                             while ( true );
@@ -908,31 +1103,23 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                                  }
 
                                                  try {
-                                                     // process the messages
-                                                     List<IndexEventResult> indexEventResults =
-                                                         callEventHandlers( messages );
-
-                                                     // submit the processed messages to index producer
-                                                     List<LegacyQueueMessage> messagesToAck =
-                                                         submitToIndex( indexEventResults, isUtilityQueue );
-
-                                                     if ( messagesToAck.size() < messages.size() ) {
-                                                         logger.warn(
-                                                             "Missing {} message(s) from index processing",
-                                                            messages.size() - messagesToAck.size() );
+                                                     // put the dead letter messages back in the appropriate queue
+                                                     LegacyQueueManager returnQueue = null;
+                                                     if (isUtilityDeadQueue) {
+                                                         logger.warn("Utility dead queue message count: {}", messages.size());
+                                                         returnQueue = utilityQueue;
+                                                     } else {
+                                                         logger.warn("Index dead queue message count: {}", messages.size());
+                                                         returnQueue = indexQueue;
                                                      }
-
-                                                     // ack each message if making it to this point
-                                                     if( messagesToAck.size() > 0 ){
-
-                                                         if ( isUtilityQueue ){
-                                                             ackUtilityQueue( messagesToAck );
-                                                         }else{
-                                                             ack( messagesToAck );
-                                                         }
+                                                     List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
+                                                     if (isUtilityDeadQueue) {
+                                                         ackUtilityDeadQueue(successMessages);
+                                                     } else {
+                                                         ackIndexDeadQueue(successMessages);
                                                      }
 
-                                                     return messagesToAck;
+                                                     return messages;
                                                  }
                                                  catch ( Exception e ) {
                                                      logger.error( "Failed to ack messages", e );
@@ -1042,7 +1229,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     public String getQueueManagerClass() {
 
-        return queue.getClass().getSimpleName();
+        return indexQueue.getClass().getSimpleName();
 
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 45dff1c..7eecf04 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -34,10 +34,16 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
 
+    String DLQ_FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.deadletter.rejected_retry_wait";
+
     String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
 
+    String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
+
+    String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -50,13 +56,21 @@ public interface IndexProcessorFig extends GuicyFig {
 
 
     /**
-     * Set the amount of time to wait when Elasticsearch rejects a requests before
+     * Set the amount of time to wait when indexing or utility queue rejects a request before
      * retrying.  This provides simple back pressure. (in milliseconds)
      */
     @Default("1000")
     @Key(FAILURE_REJECTED_RETRY_WAIT_TIME)
     long getFailureRetryTime();
 
+    /**
+     * Set the amount of time to wait when indexing or utility dead letter queue rejects a request before
+     * retrying.  This provides simple back pressure. (in milliseconds)
+     */
+    @Default("2000")
+    @Key(DLQ_FAILURE_REJECTED_RETRY_WAIT_TIME)
+    long getDeadLetterFailureRetryTime();
+
 
     /**
      * Set the visibility timeout for messages received from the queue. (in milliseconds).
@@ -91,6 +105,20 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtility();
 
     /**
+     * The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DEADLETTER)
+    int getWorkerCountDeadLetter();
+
+    /**
+     * The number of worker threads used to read dead messages from the utility dead letter queue and reload them into the utility queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER)
+    int getWorkerCountUtilityDeadLetter();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 40f8eea..6fe96dd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -14,11 +14,14 @@ public interface LegacyQueueFig extends GuicyFig {
      * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html*
      */
 
+    String USERGRID_CLUSTER_REGION_LIST = "usergrid.cluster.region.list";
+    String USERGRID_CLUSTER_REGION_LOCAL = "usergrid.cluster.region.local";
+
 
     /**
      * Primary region to use for Amazon queues.
      */
-    @Key( "usergrid.cluster.region.local" )
+    @Key( USERGRID_CLUSTER_REGION_LOCAL )
     @Default("us-east-1")
     String getPrimaryRegion();
 
@@ -34,7 +37,7 @@ public interface LegacyQueueFig extends GuicyFig {
      * Comma-separated list of one or more Amazon regions to use if multiregion
      * is set to true.
      */
-    @Key( "usergrid.cluster.region.list" )
+    @Key( USERGRID_CLUSTER_REGION_LIST )
     @Default("us-east-1")
     String getRegionList();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
index 117ce1c..f153610 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.queue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.Set;
 
 /**ctor
  * Manages queues for usergrid.  Current implementation is sqs based.
@@ -68,6 +69,14 @@ public interface LegacyQueueManager {
     void sendMessages(List bodies) throws IOException;
 
     /**
+     * send messages to queue
+     * @param queueMessages
+     * @throws IOException
+     * @return set of receipt handles for successfully sent messages
+     */
+    List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException;
+
+    /**
      * send a message to queue
      * @param body
      * @throws IOException

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
index 3856738..6882718 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
@@ -40,4 +40,9 @@ public interface LegacyQueueScope {
      * Get the Usergrid region enum
      */
     RegionImplementation getRegionImplementation();
+
+    /**
+     * Is this for the dead letter queue?
+     */
+    boolean isDeadLetterQueue();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 7a793b4..cbba0b1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -27,9 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -92,6 +90,22 @@ public class LocalQueueManager implements LegacyQueueManager {
         }
     }
 
+    @Override
+    public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException {
+        List<LegacyQueueMessage> successMessages = new ArrayList<>();
+        for(LegacyQueueMessage queueMessage : queueMessages){
+            String uuid = UUID.randomUUID().toString();
+            try {
+                LegacyQueueMessage msg = new LegacyQueueMessage(uuid, "handle_" + uuid, queueMessage.getBody(), "put type here");
+                queue.put(msg);
+                successMessages.add(queueMessage);
+            }catch (InterruptedException ie){
+                throw new RuntimeException(ie);
+            }
+        }
+        return successMessages;
+    }
+
 
     @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index b38eeb8..a485f55 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -27,18 +27,22 @@ import org.apache.usergrid.persistence.queue.impl.QakkaQueueManager;
 import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
 import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Simple module for wiring our collection api
  */
 public class QueueModule extends AbstractModule {
+    private static final Logger logger = LoggerFactory.getLogger( QueueModule.class );
 
     private LegacyQueueManager.Implementation implementation;
 
 
     public QueueModule( String queueManagerType ) {
 
+        logger.info("QueueManagerType={}", queueManagerType);
         if ( "DISTRIBUTED_SNS".equals( queueManagerType ) ) {
             this.implementation = LegacyQueueManager.Implementation.DISTRIBUTED_SNS;
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
index 9dd0421..a3e87e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
@@ -23,10 +23,18 @@ public class LegacyQueueScopeImpl implements LegacyQueueScope {
 
     private final String name;
     private final RegionImplementation regionImpl;
+    private final boolean isDeadLetterQueue;
 
     public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl) {
         this.name = name;
         this.regionImpl = regionImpl;
+        this.isDeadLetterQueue = false;
+    }
+
+    public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl, final boolean isDeadLetterQueue) {
+        this.name = name;
+        this.regionImpl = regionImpl;
+        this.isDeadLetterQueue = isDeadLetterQueue;
     }
 
     @Override
@@ -38,6 +46,9 @@ public class LegacyQueueScopeImpl implements LegacyQueueScope {
     public RegionImplementation getRegionImplementation() {return regionImpl;}
 
     @Override
+    public boolean isDeadLetterQueue() {return isDeadLetterQueue;}
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;
@@ -52,6 +63,13 @@ public class LegacyQueueScopeImpl implements LegacyQueueScope {
             return false;
         }
 
+        if ( regionImpl != queueScope.getRegionImplementation() ) {
+            return false;
+        }
+
+        if ( isDeadLetterQueue != queueScope.isDeadLetterQueue ) {
+            return false;
+        }
 
         return true;
     }
@@ -59,6 +77,11 @@ public class LegacyQueueScopeImpl implements LegacyQueueScope {
 
     @Override
     public int hashCode() {
-        return name.hashCode();
+        String deadLetter = "REGULAR";
+        if (isDeadLetterQueue) {
+            deadLetter = "DEADLETTER";
+        }
+        String hashString = name + "|" + regionImpl.name() + "|" + deadLetter;
+        return hashString.hashCode();
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index b6ca429..e7fa47b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -195,6 +195,19 @@ public class QakkaQueueManager implements LegacyQueueManager {
 
 
     @Override
+    public List<LegacyQueueMessage> sendQueueMessages( List<LegacyQueueMessage> queueMessages ) throws IOException {
+
+        List<LegacyQueueMessage> successMessages = new ArrayList<>();
+        for ( LegacyQueueMessage queueMessage : queueMessages ) {
+            sendMessageToLocalRegion( (Serializable)queueMessage.getBody() );
+            successMessages.add(queueMessage);
+        }
+
+        return successMessages;
+    }
+
+
+    @Override
     public void deleteQueue() {
         queueManager.deleteQueue( scope.getName() );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index b2a7680..1370298 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -20,15 +20,13 @@ package org.apache.usergrid.persistence.queue.impl;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.sqs.model.*;
+import com.sun.javaws.exceptions.InvalidArgumentException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,20 +52,6 @@ import com.amazonaws.services.sns.model.SubscribeRequest;
 import com.amazonaws.services.sns.model.SubscribeResult;
 import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
-import com.amazonaws.services.sqs.model.DeleteMessageRequest;
-import com.amazonaws.services.sqs.model.DeleteQueueRequest;
-import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
-import com.amazonaws.services.sqs.model.SendMessageResult;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -99,6 +83,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
     private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
     private static final int MIN_CLIENT_SOCKET_TIMEOUT = 5000; // millis
     private static final int MIN_VISIBILITY_TIMEOUT = 1; //seconds
+    private static final String DEAD_LETTER_QUEUE_SUFFIX = "_dead";
 
     static {
 
@@ -133,6 +118,11 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
                     queue = new LegacyQueue( result.getQueueUrl() );
                 }
                 catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+                    if (queueName.endsWith(DEAD_LETTER_QUEUE_SUFFIX)) {
+                        // don't auto-create dead letter queues
+                        logger.error("failed to get dead letter queue from service, won't create", queueDoesNotExistException);
+                        throw queueDoesNotExistException;
+                    }
                     logger.error( "Queue {} does not exist, will create", queueName );
                 }
                 catch ( Exception e ) {
@@ -251,8 +241,14 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
             for ( String regionName : regionNames ) {
 
                 regionName = regionName.trim();
-                Regions regions = Regions.fromName( regionName );
-                Region region = Region.getRegion( regions );
+                Region region = null;
+                try {
+                    Regions regions = Regions.fromName(regionName);
+                    region = Region.getRegion(regions);
+                }
+                catch (IllegalArgumentException e) {
+                    throw new IllegalArgumentException("INVALID REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LIST + ": " + regionName, e);
+                }
 
                 AmazonSQSClient sqsClient = createSQSClient( region );
                 AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
@@ -380,19 +376,26 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
     }
 
 
-    private String getName() {
+    private String getName(final boolean isDeadLetter) {
         String name =
             clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_"
                 + scope.getRegionImplementation();
+        if (isDeadLetter) {
+            name += DEAD_LETTER_QUEUE_SUFFIX;
+        }
         name = name.toLowerCase(); //user lower case values
         Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
 
         return name;
     }
 
+    private String getName() {
+        return getName(false);
+    }
+
 
     public LegacyQueue getReadQueue() {
-        String queueName = getName();
+        String queueName = getName(scope.isDeadLetterQueue());
 
         try {
             return readQueueUrlMap.get( queueName );
@@ -588,6 +591,43 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
 
 
     @Override
+    public List<LegacyQueueMessage> sendQueueMessages(List<LegacyQueueMessage> queueMessages) throws IOException {
+
+        List<LegacyQueueMessage> successMessages = new ArrayList<>();
+
+        if ( sqs == null ) {
+            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
+            return successMessages;
+        }
+
+        String url = getReadQueue().getUrl();
+
+        List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
+
+        for (LegacyQueueMessage queueMessage : queueMessages) {
+            entries.add(new SendMessageBatchRequestEntry(queueMessage.getMessageId(), queueMessage.getStringBody()));
+        }
+
+        SendMessageBatchResult result = sqs.sendMessageBatch(url, entries);
+
+        Set<String> successIDs = new HashSet<>();
+        logger.debug("sendQueueMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());
+
+        for (SendMessageBatchResultEntry batchResultEntry : result.getSuccessful()) {
+            successIDs.add(batchResultEntry.getId());
+        }
+
+        for (LegacyQueueMessage queueMessage : queueMessages) {
+            if (successIDs.contains(queueMessage.getMessageId())) {
+                successMessages.add(queueMessage);
+            }
+        }
+
+        return successMessages;
+    }
+
+
+    @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
 
         if ( sqsAsync == null ) {
@@ -663,6 +703,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
         DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );
 
         boolean successful = result.getFailed().size() <= 0;
+        logger.debug("commitMessages: successful: {}, failed: {}", result.getSuccessful().size(), result.getFailed().size());
 
         if ( !successful ) {
             for ( BatchResultErrorEntry failed : result.getFailed() ) {
@@ -684,8 +725,14 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
      * Get the region
      */
     private Region getRegion() {
-        Regions regions = Regions.fromName(fig.getPrimaryRegion());
-        return Region.getRegion(regions);
+        String regionName = fig.getPrimaryRegion();
+        try {
+            Regions regions = Regions.fromName(regionName);
+            return Region.getRegion(regions);
+        }
+        catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("INVALID PRIMARY REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_CLUSTER_REGION_LOCAL + ": " + regionName, e);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
index 6d99250..6cdad3b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services;
 
 import java.util.*;
 
+import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -442,6 +443,14 @@ public class AbstractCollectionService extends AbstractService {
                     item = em.createItemInCollection( context.getOwner(), context.getCollectionName(), getEntityType(),
                             p );
                 }
+                catch (DuplicateUniquePropertyExistsException e) {
+                    // this is not an error (caller tried to create entity with a duplicate unique value)
+                    logger.info("Entity [{}] unable to be created in collection [{}] due to [{} - {}]", p, context.getCollectionName(),
+                        e.getClass().getSimpleName(), e.getMessage());
+
+                    // would be nice if status for each batch entry was returned...
+                    continue;
+                }
                 catch ( Exception e ) {
 
                     logger.error("Entity [{}] unable to be created in collection [{}] due to [{} - {}]", p, context.getCollectionName(),

http://git-wip-us.apache.org/repos/asf/usergrid/blob/59354608/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index c00575f..f7e107d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -24,7 +24,9 @@ package org.apache.usergrid.services.queues;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.usergrid.persistence.queue.LegacyQueueManager;
 import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
@@ -65,6 +67,12 @@ public class ImportQueueManager implements LegacyQueueManager {
 
 
     @Override
+    public List<LegacyQueueMessage> sendQueueMessages(final List<LegacyQueueMessage> queueMessages ) throws IOException {
+        return new ArrayList<>();
+    }
+
+
+    @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException {
 
     }