You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/20 20:02:07 UTC

[04/12] usergrid git commit: Fixes empty payload notification issue.

Fixes empty payload notification issue.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0326629a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0326629a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0326629a

Branch: refs/heads/2.1-release
Commit: 0326629a24cec3bd44d91810b4b8f0516c69c9b8
Parents: 3e15585
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:53:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:53:30 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 55 ++++++++++++--------
 .../asyncevents/AsyncIndexProvider.java         |  4 +-
 .../index/AmazonAsyncEventServiceTest.java      |  2 +-
 3 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f8ef5e7..6b2eb45 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -89,6 +89,21 @@ import rx.Subscription;
 import rx.schedulers.Schedulers;
 
 
+/**
+ * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce.  Keep the code in the handle as is
+ * 2. Consume:  Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation.  Any handler will be refactored into it's own
+ *      impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge them.  Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
 @Singleton
 public class AmazonAsyncEventService implements AsyncEventService {
 
@@ -360,7 +375,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope );
-        offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+        offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
     }
 
 
@@ -503,35 +519,29 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final String message = esMapPersistence.getString( messageId.toString() );
 
-        String highConsistency = null;
+        final IndexOperationMessage indexOperationMessage;
 
         if(message == null){
             logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
 
-            highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
-
-        }
+            final String highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
 
-        //read the value from the string
+            if(highConsistency == null){
+                logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
 
-        final IndexOperationMessage indexOperationMessage;
+                throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+            }
 
-        //our original local read has it, parse it.
-        if(message != null){
-             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
-        }
-        //we tried to read it at a higher consistency level and it works
-        else if (highConsistency != null){
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
-        }
 
-        //we couldn't find it, bail
-        else{
-            logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
-
-            throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+        } else{
+            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
 
+        //read the value from the string
+
+        Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
+        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
 
 
         //now execute it
@@ -728,9 +738,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
             .map(result -> result.getQueueMessage().get())
             .collect(Collectors.toList());
 
-        //send the batch
-        //TODO: should retry?
-        queueIndexOperationMessage( combined );
+        //only Q it if it's empty
+        if(!combined.isEmpty()) {
+            queueIndexOperationMessage( combined );
+        }
 
         return messagesToAck;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 1649046..2bace8d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -105,10 +105,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
             case LOCAL:
                 return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
             case SQS:
-                throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
+                throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
             case SNS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 8ee47a2..625a8fd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig,  rxTaskScheduler );
     }