You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/11 05:47:35 UTC

[3/4] usergrid git commit: Improvements around async event processing.

Improvements around async event processing.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/84357831
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/84357831
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/84357831

Branch: refs/heads/master
Commit: 8435783104a5e01bdc0c6b257ebe29eac71765a3
Parents: 36a0161
Author: Michael Russo <mr...@apigee.com>
Authored: Thu Mar 10 20:44:40 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Mar 10 20:44:40 2016 -0800

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 103 ++++++++++---------
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../persistence/queue/QueueMessage.java         |  10 ++
 .../queue/impl/SNSQueueManagerImpl.java         |   7 ++
 .../resources/usergrid-rest-deploy-context.xml  |  29 ++++--
 5 files changed, 89 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 200d1e6..4d78340 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -282,12 +282,18 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
 
         if (logger.isDebugEnabled()) {
-            logger.debug("callEventHandlers with {} message", messages.size());
+            logger.debug("callEventHandlers with {} message(s)", messages.size());
         }
 
         Stream<IndexEventResult> indexEventResults = messages.stream().map(message ->
 
         {
+            if(logger.isDebugEnabled()){
+                logger.debug("Queue message with ID {} has been received {} time(s)",
+                    message.getMessageId(),
+                    message.getReceiveCount() );
+            }
+
             AsyncEvent event = null;
             try {
                 event = (AsyncEvent) message.getBody();
@@ -305,7 +311,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             final AsyncEvent thisEvent = event;
 
             if (logger.isDebugEnabled()) {
-                logger.debug("Processing {} event", event);
+                logger.debug("Processing event with type {}", event.getClass().getSimpleName());
             }
 
             IndexOperationMessage indexOperationMessage = null;
@@ -333,7 +339,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                 } else {
 
-                    throw new Exception("Unknown EventType for message: "+ message.getStringBody());
+                    throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
                 }
 
 
@@ -345,13 +351,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                 // this exception is throw when we wait before trying quorum read on map persistence.
                 // return empty event result so the event's message doesn't get ack'd
-                logger.info(e.getMessage());
+                if(logger.isDebugEnabled()){
+                    logger.debug(e.getMessage());
+                }
                 return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
 
             } catch (Exception e) {
 
                 // if the event fails to process, log and return empty message result so it doesn't get ack'd
-                logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
+                logger.error("{}. Failed to process message: {}", e.getMessage(), message.getStringBody().trim() );
                 return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
             }
         });
@@ -471,59 +479,49 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
     public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
-         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
 
-        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+        Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
 
+        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
         Preconditions.checkNotNull( messageId, "messageId must not be null" );
 
 
-        //load the entity
-
         final String message = esMapPersistence.getString( messageId.toString() );
 
-        final IndexOperationMessage indexOperationMessage;
 
+        final IndexOperationMessage indexOperationMessage;
         if(message == null) {
 
-           if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+            // provide some time back pressure before performing a quorum read
+            if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
 
-               logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level",
-                   messageId);
-
-               final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
+                if(logger.isDebugEnabled()){
+                    logger.debug("ES batch with id {} not found, reading with strong consistency", messageId);
+                }
 
-               if (highConsistency == null) {
-                   logger.error("Unable to find the ES batch with id {} to process at a higher consistency level",
-                       messageId);
+                final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
+                if (highConsistency == null) {
 
-                   throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId);
+                   throw new RuntimeException("ES batch with id "+messageId+" not found when reading with strong consistency");
                }
 
                indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
 
-           } else{
+           } else {
 
                throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
 
            }
 
-        } else{
+        } else {
+
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
 
-        initializeEntityIndexes(indexOperationMessage);
-
-        //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
-        //so we'll let compaction on column expiration handle deletion
-
-        //read the value from the string
-
-        Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
-        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
 
+        // always do a check to ensure the indexes are initialized for the index requests
+        initializeEntityIndexes(indexOperationMessage);
 
-        //now execute it
         return indexOperationMessage;
 
     }
@@ -694,30 +692,31 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                              .map( messages -> {
                                                  if ( messages == null || messages.size() == 0 ) {
+                                                     // no messages came from the queue, move on
                                                      return null;
                                                  }
 
                                                  try {
+                                                     // process the messages
                                                      List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+
+                                                     // submit the processed messages to index producer
                                                      List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
 
-                                                     if ( messagesToAck == null || messagesToAck.size() == 0 ) {
-                                                         logger.error(
-                                                             "No messages came back from the queue operation, should have seen {} messages",
-                                                                 messages.size() );
-                                                         return messagesToAck;
+                                                     if ( messagesToAck.size() < messages.size() ) {
+                                                         logger.warn( "Missing {} message(s) from index processing",
+                                                            messages.size() - messagesToAck.size() );
                                                      }
 
-                                                     if ( messagesToAck.size() < messages.size() ) {
-                                                         logger.error( "Missing messages from queue post operation",
-                                                             messages, messagesToAck );
+                                                     // ack each message if making it to this point
+                                                     if( messagesToAck.size() > 0 ){
+                                                         ack( messagesToAck );
                                                      }
-                                                     //ack each message, but only if we didn't error.
-                                                     ack( messagesToAck );
+
                                                      return messagesToAck;
                                                  }
                                                  catch ( Exception e ) {
-                                                     logger.error( "failed to ack messages to sqs", e );
+                                                     logger.error( "Failed to ack messages", e );
                                                      return null;
                                                      //do not rethrow so we can process all of them
                                                  }
@@ -739,26 +738,30 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * @return
      */
     private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
-        //if nothing came back then return null
+
+        // if nothing came back then return null
         if(indexEventResults==null){
             return null;
         }
-        IndexOperationMessage combined = new IndexOperationMessage();
 
-        // stream the messages to record the cycle time
+        IndexOperationMessage combined = new IndexOperationMessage();
         List<QueueMessage> queueMessages = indexEventResults.stream()
+
+            // filter out messages that are not present, they were not processed and put into the results
+            .filter( result -> result.getQueueMessage().isPresent() )
             .map(indexEventResult -> {
+
                 //record the cycle time
                 messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+
+                // ingest each index op into our combined, single index op for the index producer
                 if(indexEventResult.getIndexOperationMessage().isPresent()){
                     combined.ingest(indexEventResult.getIndexOperationMessage().get());
                 }
-                return indexEventResult;
+
+                return indexEventResult.getQueueMessage().get();
             })
-            // filter out messages that are not present, they were not processed and put into the results
-            .filter( result -> result.getQueueMessage().isPresent() )
-            .map(result -> result.getQueueMessage().get())
-            // collect
+            // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
         // sumbit the requests to Elasticsearch

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index a7d299e..ca6e011 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -84,7 +84,7 @@ public interface QueueFig extends GuicyFig {
     int getVisibilityTimeout();
 
     @Key( "usergrid.queue.localquorum.timeout")
-    @Default("5000") // 5 seconds
+    @Default("30000") // 30 seconds
     int getLocalQuorumTimeout();
 
     @Key( "usergrid.queue.client.connection.timeout")

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
index 55f79f4..f8ce6ef 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -23,6 +23,7 @@ public class QueueMessage {
     private final String handle;
     private final String type;
     private String stringBody;
+    private int receiveCount;
 
 
     public QueueMessage(String messageId, String handle, Object body,String type) {
@@ -31,6 +32,7 @@ public class QueueMessage {
         this.handle = handle;
         this.type = type;
         this.stringBody = "";
+        this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue
     }
 
     public String getHandle() {
@@ -57,4 +59,12 @@ public class QueueMessage {
     public String getStringBody() {
         return stringBody;
     }
+
+    public void setReceiveCount(int receiveCount){
+        this.receiveCount = receiveCount;
+    }
+
+    public int getReceiveCount(){
+        return receiveCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 6c78035..0be5bd0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -427,7 +427,12 @@ public class SNSQueueManagerImpl implements QueueManager {
             logger.trace( "Getting up to {} messages from {}", limit, url );
         }
 
+        ArrayList<String> requestMessageAttributeNames = new ArrayList<String>(1);
+        requestMessageAttributeNames.add("ApproximateReceiveCount");
+
+
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
+        receiveMessageRequest.setAttributeNames(requestMessageAttributeNames);
         receiveMessageRequest.setMaxNumberOfMessages( limit );
         receiveMessageRequest.setVisibilityTimeout(
             Math.max( MIN_VISIBILITY_TIMEOUT, fig.getVisibilityTimeout() / 1000 ) );
@@ -477,6 +482,8 @@ public class SNSQueueManagerImpl implements QueueManager {
                 QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
                     message.getAttributes().get( "type" ) );
                 queueMessage.setStringBody( originalBody );
+                int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount"));
+                queueMessage.setReceiveCount( receiveCount );
                 queueMessages.add( queueMessage );
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
index 4438bbd..012f0c8 100644
--- a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
+++ b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
@@ -26,16 +26,23 @@
 	<import resource="classpath:/usergrid-rest-context.xml" />
 
 
-	<bean id="properties"
-		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
-		<property name="singleton" value="true" />
-		<property name="ignoreResourceNotFound" value="true" />
-		<property name="locations">
-			<list>
-				<value>classpath:/usergrid-default.properties</value>
-				<value>classpath:/usergrid-deployment.properties</value>
-			</list>
-		</property>
-	</bean>
+    <bean id="properties"
+          class="org.springframework.beans.factory.config.PropertiesFactoryBean">
+        <property name="properties" ref="sysProps"/>
+        <property name="localOverride" value="true"/>
+        <property name="singleton" value="true" />
+        <property name="ignoreResourceNotFound" value="true" />
+        <property name="locations">
+            <list>
+                <value>classpath:/usergrid-default.properties</value>
+                <value>classpath:/usergrid-deployment.properties</value>
+            </list>
+        </property>
+    </bean>
+
+    <bean id="sysProps" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+        <property name="targetClass"><value>java.lang.System</value></property>
+        <property name="targetMethod"><value>getProperties</value></property>
+    </bean>
 
 </beans>