You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/20 20:02:07 UTC
[04/12] usergrid git commit: Fixes empty payload notification issue.
Fixes empty payload notification issue.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0326629a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0326629a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0326629a
Branch: refs/heads/2.1-release
Commit: 0326629a24cec3bd44d91810b4b8f0516c69c9b8
Parents: 3e15585
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:53:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:53:30 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 55 ++++++++++++--------
.../asyncevents/AsyncIndexProvider.java | 4 +-
.../index/AmazonAsyncEventServiceTest.java | 2 +-
3 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f8ef5e7..6b2eb45 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -89,6 +89,21 @@ import rx.Subscription;
import rx.schedulers.Schedulers;
+/**
+ * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce. Keep the code in the handle as is
+ * 2. Consume: Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own
+ * impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
@Singleton
public class AmazonAsyncEventService implements AsyncEventService {
@@ -360,7 +375,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope );
- offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+ offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
+ new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
}
@@ -503,35 +519,29 @@ public class AmazonAsyncEventService implements AsyncEventService {
final String message = esMapPersistence.getString( messageId.toString() );
- String highConsistency = null;
+ final IndexOperationMessage indexOperationMessage;
if(message == null){
logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
- highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
-
- }
+ final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
- //read the value from the string
+ if(highConsistency == null){
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
- final IndexOperationMessage indexOperationMessage;
+ throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ }
- //our original local read has it, parse it.
- if(message != null){
- indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
- }
- //we tried to read it at a higher consistency level and it works
- else if (highConsistency != null){
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
- }
- //we couldn't find it, bail
- else{
- logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
-
- throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ } else{
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
+ //read the value from the string
+
+ Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
+ Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
//now execute it
@@ -728,9 +738,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
.map(result -> result.getQueueMessage().get())
.collect(Collectors.toList());
- //send the batch
- //TODO: should retry?
- queueIndexOperationMessage( combined );
+ //only Q it if it's empty
+ if(!combined.isEmpty()) {
+ queueIndexOperationMessage( combined );
+ }
return messagesToAck;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 1649046..2bace8d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -105,10 +105,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
case LOCAL:
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS:
- throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
+ throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 8ee47a2..625a8fd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
}