You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:20 UTC
[30/50] [abbrv] usergrid git commit: remove observables in favor of
streams()
remove observables in favor of streams()
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b4855172
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b4855172
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b4855172
Branch: refs/heads/asf-site
Commit: b4855172283a62d178d157be112be345c1e150d8
Parents: 0692faa
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 8 18:07:23 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 8 18:07:23 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 95 +++++++++++---------
1 file changed, 51 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4855172/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 5a46aed..0ac3860 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import com.google.common.base.Optional;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
@@ -255,22 +256,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
logger.debug("handleMessages with {} message", messages.size());
}
- final int bufferSize = messages.size();
- Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
+ Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
AsyncEvent event = null;
- try{
+ try {
event = (AsyncEvent) message.getBody();
- }catch (ClassCastException cce){
- logger.error("Failed to deserialize message body",cce);
+ } catch (ClassCastException cce) {
+ logger.error("Failed to deserialize message body", cce);
}
if (event == null) {
logger.error("AsyncEvent type or event is null!");
- return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),System.currentTimeMillis()));
+ return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
}
final AsyncEvent thisEvent = event;
- if(logger.isDebugEnabled()) {
+ if (logger.isDebugEnabled()) {
logger.debug("Processing {} event", event);
}
@@ -287,53 +287,55 @@ public class AmazonAsyncEventService implements AsyncEventService {
indexoperationObservable = handleEntityIndexUpdate(message);
} else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
- handleInitializeApplicationIndex(event,message);
+ handleInitializeApplicationIndex(event, message);
indexoperationObservable = Observable.just(new IndexOperationMessage());
} else {
throw new Exception("Unknown EventType");//TODO: print json instead
}
+ //collect all of the
+ IndexOperationMessage indexOperationMessage =
+ indexoperationObservable
+ .collect(() -> new IndexOperationMessage(), (collector, single ) -> collector.ingest(single))
+ .toBlocking().lastOrDefault(null);
+
//return type that can be indexed and ack'd later
- return indexoperationObservable
- .map(indexOperationMessage ->
- new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage),thisEvent.getCreationTime())
- );
+ return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
} catch (Exception e) {
logger.error("Failed to index message: " + message.getMessageId(), e, message);
- return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()));
+ return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
}
});
+ //resolve the list and return it.
+ final List<IndexEventResult> indexEventResults = masterObservable.toList().toBlocking().lastOrDefault(null);
+ //if nothing came back then return null
+ if(indexEventResults==null){
+ return null;
+ }
- //filter for success, send to the index(optional), ack
- return masterObservable
- //take the max
- .buffer(bufferSize)
- //map them to index results and return them
- .flatMap(indexEventResults -> {
- IndexOperationMessage combined = new IndexOperationMessage();
- indexEventResults.stream().forEach(
- indexEventResult -> {
- if (indexEventResult.getIndexOperationMessage().isPresent()) {
- combined.ingest(indexEventResult.getIndexOperationMessage().get());
- }
- });
-
+ final IndexOperationMessage combined = new IndexOperationMessage();
+ //stream and filer the messages
+ List<QueueMessage> messagesToAck = indexEventResults.stream()
+ .map(indexEventResult -> {
+ if (indexEventResult.getIndexOperationMessage().isPresent()) {
+ combined.ingest(indexEventResult.getIndexOperationMessage().get());
+ }
+ return indexEventResult;
+ })
+ .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
+ .map(indexEventResult -> {
+ messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+ return indexEventResult;
+ })
//ack after successful completion of the operation.
- return indexProducer.put(combined)
- //change observable type
- .flatMap(indexOperationMessage -> Observable.from(indexEventResults))
- //remove unsuccessful
- .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
- //measure
- .doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()))
- //return the queue messages to ack
- .map(result -> result.getQueueMessage().get())
- .toList();
+ .map(result -> result.getQueueMessage().get())
+ .collect(Collectors.toList());
- })
- .doOnError(t -> logger.error("Failed to process queuemessages",t))
- .toBlocking().lastOrDefault(null);
+ //send the batch
+ indexProducer.put(combined).toBlocking().lastOrDefault(null);
+
+ return messagesToAck;
}
@@ -473,11 +475,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
- final Observable<IndexOperationMessage> merged = entityDeleteResults
+ entityDeleteResults
.getEntitiesCompacted()
- .collect(() -> new ArrayList<>(), (list, item) -> list.add(item))
- .flatMap(collected -> entityDeleteResults.getIndexObservable()) ;
- return merged;
+ .collect(() -> new ArrayList<>(), (list, item) -> list.add(item)).toBlocking().lastOrDefault(null);
+
+ return entityDeleteResults.getIndexObservable();
}
@@ -574,10 +576,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
List<QueueMessage> messagesToAck = handleMessages(messages);
if (messagesToAck == null || messagesToAck.size() == 0) {
+ logger.error("No messages came back from the queue operation",messages);
return messagesToAck;
}
+ if(messagesToAck.size()<messages.size()){
+ logger.error("Missing messages from queue post operation",messages,messagesToAck);
+ }
//ack each message, but only if we didn't error.
ack(messagesToAck);
+ //messagesToAck.stream().forEach(message -> ack(message));
return messagesToAck;
} catch (Exception e) {
logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e);