You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/11 05:47:35 UTC
[3/4] usergrid git commit: Improvements around async event processing.
Improvements around async event processing.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/84357831
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/84357831
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/84357831
Branch: refs/heads/master
Commit: 8435783104a5e01bdc0c6b257ebe29eac71765a3
Parents: 36a0161
Author: Michael Russo <mr...@apigee.com>
Authored: Thu Mar 10 20:44:40 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Mar 10 20:44:40 2016 -0800
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 103 ++++++++++---------
.../usergrid/persistence/queue/QueueFig.java | 2 +-
.../persistence/queue/QueueMessage.java | 10 ++
.../queue/impl/SNSQueueManagerImpl.java | 7 ++
.../resources/usergrid-rest-deploy-context.xml | 29 ++++--
5 files changed, 89 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 200d1e6..4d78340 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -282,12 +282,18 @@ public class AsyncEventServiceImpl implements AsyncEventService {
private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
if (logger.isDebugEnabled()) {
- logger.debug("callEventHandlers with {} message", messages.size());
+ logger.debug("callEventHandlers with {} message(s)", messages.size());
}
Stream<IndexEventResult> indexEventResults = messages.stream().map(message ->
{
+ if(logger.isDebugEnabled()){
+ logger.debug("Queue message with ID {} has been received {} time(s)",
+ message.getMessageId(),
+ message.getReceiveCount() );
+ }
+
AsyncEvent event = null;
try {
event = (AsyncEvent) message.getBody();
@@ -305,7 +311,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final AsyncEvent thisEvent = event;
if (logger.isDebugEnabled()) {
- logger.debug("Processing {} event", event);
+ logger.debug("Processing event with type {}", event.getClass().getSimpleName());
}
IndexOperationMessage indexOperationMessage = null;
@@ -333,7 +339,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
} else {
- throw new Exception("Unknown EventType for message: "+ message.getStringBody());
+ throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
}
@@ -345,13 +351,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// this exception is throw when we wait before trying quorum read on map persistence.
// return empty event result so the event's message doesn't get ack'd
- logger.info(e.getMessage());
+ if(logger.isDebugEnabled()){
+ logger.debug(e.getMessage());
+ }
return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
} catch (Exception e) {
// if the event fails to process, log and return empty message result so it doesn't get ack'd
- logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
+ logger.error("{}. Failed to process message: {}", e.getMessage(), message.getStringBody().trim() );
return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
}
});
@@ -471,59 +479,49 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
- Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
- final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+ Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+ final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
Preconditions.checkNotNull( messageId, "messageId must not be null" );
- //load the entity
-
final String message = esMapPersistence.getString( messageId.toString() );
- final IndexOperationMessage indexOperationMessage;
+ final IndexOperationMessage indexOperationMessage;
if(message == null) {
- if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+ // provide some time back pressure before performing a quorum read
+ if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
- logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level",
- messageId);
-
- final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
+ if(logger.isDebugEnabled()){
+ logger.debug("ES batch with id {} not found, reading with strong consistency", messageId);
+ }
- if (highConsistency == null) {
- logger.error("Unable to find the ES batch with id {} to process at a higher consistency level",
- messageId);
+ final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
+ if (highConsistency == null) {
- throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId);
+ throw new RuntimeException("ES batch with id "+messageId+" not found when reading with strong consistency");
}
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
- } else{
+ } else {
throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
}
- } else{
+ } else {
+
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
- initializeEntityIndexes(indexOperationMessage);
-
- //NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message
- //so we'll let compaction on column expiration handle deletion
-
- //read the value from the string
-
- Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
- Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
+ // always do a check to ensure the indexes are initialized for the index requests
+ initializeEntityIndexes(indexOperationMessage);
- //now execute it
return indexOperationMessage;
}
@@ -694,30 +692,31 @@ public class AsyncEventServiceImpl implements AsyncEventService {
.map( messages -> {
if ( messages == null || messages.size() == 0 ) {
+ // no messages came from the queue, move on
return null;
}
try {
+ // process the messages
List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+
+ // submit the processed messages to index producer
List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
- if ( messagesToAck == null || messagesToAck.size() == 0 ) {
- logger.error(
- "No messages came back from the queue operation, should have seen {} messages",
- messages.size() );
- return messagesToAck;
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.warn( "Missing {} message(s) from index processing",
+ messages.size() - messagesToAck.size() );
}
- if ( messagesToAck.size() < messages.size() ) {
- logger.error( "Missing messages from queue post operation",
- messages, messagesToAck );
+ // ack each message if making it to this point
+ if( messagesToAck.size() > 0 ){
+ ack( messagesToAck );
}
- //ack each message, but only if we didn't error.
- ack( messagesToAck );
+
return messagesToAck;
}
catch ( Exception e ) {
- logger.error( "failed to ack messages to sqs", e );
+ logger.error( "Failed to ack messages", e );
return null;
//do not rethrow so we can process all of them
}
@@ -739,26 +738,30 @@ public class AsyncEventServiceImpl implements AsyncEventService {
* @return
*/
private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
- //if nothing came back then return null
+
+ // if nothing came back then return null
if(indexEventResults==null){
return null;
}
- IndexOperationMessage combined = new IndexOperationMessage();
- // stream the messages to record the cycle time
+ IndexOperationMessage combined = new IndexOperationMessage();
List<QueueMessage> queueMessages = indexEventResults.stream()
+
+ // filter out messages that are not present, they were not processed and put into the results
+ .filter( result -> result.getQueueMessage().isPresent() )
.map(indexEventResult -> {
+
//record the cycle time
messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+
+ // ingest each index op into our combined, single index op for the index producer
if(indexEventResult.getIndexOperationMessage().isPresent()){
combined.ingest(indexEventResult.getIndexOperationMessage().get());
}
- return indexEventResult;
+
+ return indexEventResult.getQueueMessage().get();
})
- // filter out messages that are not present, they were not processed and put into the results
- .filter( result -> result.getQueueMessage().isPresent() )
- .map(result -> result.getQueueMessage().get())
- // collect
+ // collect into a list of QueueMessages that can be ack'd later
.collect(Collectors.toList());
// sumbit the requests to Elasticsearch
http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index a7d299e..ca6e011 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -84,7 +84,7 @@ public interface QueueFig extends GuicyFig {
int getVisibilityTimeout();
@Key( "usergrid.queue.localquorum.timeout")
- @Default("5000") // 5 seconds
+ @Default("30000") // 30 seconds
int getLocalQuorumTimeout();
@Key( "usergrid.queue.client.connection.timeout")
http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
index 55f79f4..f8ce6ef 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -23,6 +23,7 @@ public class QueueMessage {
private final String handle;
private final String type;
private String stringBody;
+ private int receiveCount;
public QueueMessage(String messageId, String handle, Object body,String type) {
@@ -31,6 +32,7 @@ public class QueueMessage {
this.handle = handle;
this.type = type;
this.stringBody = "";
+ this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue
}
public String getHandle() {
@@ -57,4 +59,12 @@ public class QueueMessage {
public String getStringBody() {
return stringBody;
}
+
+ public void setReceiveCount(int receiveCount){
+ this.receiveCount = receiveCount;
+ }
+
+ public int getReceiveCount(){
+ return receiveCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 6c78035..0be5bd0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -427,7 +427,12 @@ public class SNSQueueManagerImpl implements QueueManager {
logger.trace( "Getting up to {} messages from {}", limit, url );
}
+ ArrayList<String> requestMessageAttributeNames = new ArrayList<String>(1);
+ requestMessageAttributeNames.add("ApproximateReceiveCount");
+
+
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
+ receiveMessageRequest.setAttributeNames(requestMessageAttributeNames);
receiveMessageRequest.setMaxNumberOfMessages( limit );
receiveMessageRequest.setVisibilityTimeout(
Math.max( MIN_VISIBILITY_TIMEOUT, fig.getVisibilityTimeout() / 1000 ) );
@@ -477,6 +482,8 @@ public class SNSQueueManagerImpl implements QueueManager {
QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
message.getAttributes().get( "type" ) );
queueMessage.setStringBody( originalBody );
+ int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount"));
+ queueMessage.setReceiveCount( receiveCount );
queueMessages.add( queueMessage );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
index 4438bbd..012f0c8 100644
--- a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
+++ b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
@@ -26,16 +26,23 @@
<import resource="classpath:/usergrid-rest-context.xml" />
- <bean id="properties"
- class="org.springframework.beans.factory.config.PropertiesFactoryBean">
- <property name="singleton" value="true" />
- <property name="ignoreResourceNotFound" value="true" />
- <property name="locations">
- <list>
- <value>classpath:/usergrid-default.properties</value>
- <value>classpath:/usergrid-deployment.properties</value>
- </list>
- </property>
- </bean>
+ <bean id="properties"
+ class="org.springframework.beans.factory.config.PropertiesFactoryBean">
+ <property name="properties" ref="sysProps"/>
+ <property name="localOverride" value="true"/>
+ <property name="singleton" value="true" />
+ <property name="ignoreResourceNotFound" value="true" />
+ <property name="locations">
+ <list>
+ <value>classpath:/usergrid-default.properties</value>
+ <value>classpath:/usergrid-deployment.properties</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="sysProps" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+ <property name="targetClass"><value>java.lang.System</value></property>
+ <property name="targetMethod"><value>getProperties</value></property>
+ </bean>
</beans>