You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/14 23:50:39 UTC

[24/32] usergrid git commit: remove observables in favor of streams()

remove observables in favor of streams()


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b4855172
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b4855172
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b4855172

Branch: refs/heads/master
Commit: b4855172283a62d178d157be112be345c1e150d8
Parents: 0692faa
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 8 18:07:23 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 8 18:07:23 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 95 +++++++++++---------
 1 file changed, 51 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4855172/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 5a46aed..0ac3860 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
@@ -255,22 +256,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
             logger.debug("handleMessages with {} message", messages.size());
         }
 
-        final int bufferSize = messages.size();
-        Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
+        Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
             AsyncEvent event = null;
-            try{
+            try {
                 event = (AsyncEvent) message.getBody();
-            }catch (ClassCastException cce){
-                logger.error("Failed to deserialize message body",cce);
+            } catch (ClassCastException cce) {
+                logger.error("Failed to deserialize message body", cce);
             }
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),System.currentTimeMillis()));
+                return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
             }
 
             final AsyncEvent thisEvent = event;
-            if(logger.isDebugEnabled()) {
+            if (logger.isDebugEnabled()) {
                 logger.debug("Processing {} event", event);
             }
 
@@ -287,53 +287,55 @@ public class AmazonAsyncEventService implements AsyncEventService {
                     indexoperationObservable = handleEntityIndexUpdate(message);
                 } else if (event instanceof InitializeApplicationIndexEvent) {
                     //does not return observable
-                    handleInitializeApplicationIndex(event,message);
+                    handleInitializeApplicationIndex(event, message);
                     indexoperationObservable = Observable.just(new IndexOperationMessage());
                 } else {
                     throw new Exception("Unknown EventType");//TODO: print json instead
                 }
 
+                //collect all of the
+                IndexOperationMessage indexOperationMessage =
+                    indexoperationObservable
+                        .collect(() -> new IndexOperationMessage(), (collector, single ) -> collector.ingest(single))
+                        .toBlocking().lastOrDefault(null);
+
                 //return type that can be indexed and ack'd later
-                return indexoperationObservable
-                    .map(indexOperationMessage ->
-                            new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage),thisEvent.getCreationTime())
-                    );
+                return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
             } catch (Exception e) {
                 logger.error("Failed to index message: " + message.getMessageId(), e, message);
-                return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()));
+                return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
             }
         });
+        //resolve the list and return it.
+        final List<IndexEventResult> indexEventResults = masterObservable.toList().toBlocking().lastOrDefault(null);
+        //if nothing came back then return null
+        if(indexEventResults==null){
+            return null;
+        }
 
-        //filter for success, send to the index(optional), ack
-        return masterObservable
-            //take the max
-            .buffer(bufferSize)
-            //map them to index results and return them
-            .flatMap(indexEventResults -> {
-                IndexOperationMessage combined = new IndexOperationMessage();
-                indexEventResults.stream().forEach(
-                    indexEventResult -> {
-                        if (indexEventResult.getIndexOperationMessage().isPresent()) {
-                            combined.ingest(indexEventResult.getIndexOperationMessage().get());
-                        }
-                    });
-
+        final IndexOperationMessage combined = new IndexOperationMessage();
 
+        //stream and filer the messages
+        List<QueueMessage> messagesToAck = indexEventResults.stream()
+            .map(indexEventResult -> {
+                if (indexEventResult.getIndexOperationMessage().isPresent()) {
+                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
+                }
+                return indexEventResult;
+            })
+            .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
+            .map(indexEventResult -> {
+                messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+                return indexEventResult;
+            })
                 //ack after successful completion of the operation.
-                return indexProducer.put(combined)
-                    //change observable type
-                    .flatMap(indexOperationMessage -> Observable.from(indexEventResults))
-                        //remove unsuccessful
-                    .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
-                    //measure
-                    .doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()))
-                    //return the queue messages to ack
-                    .map(result -> result.getQueueMessage().get())
-                    .toList();
+            .map(result -> result.getQueueMessage().get())
+            .collect(Collectors.toList());
 
-            })
-            .doOnError(t -> logger.error("Failed to process queuemessages",t))
-            .toBlocking().lastOrDefault(null);
+        //send the batch
+        indexProducer.put(combined).toBlocking().lastOrDefault(null);
+
+        return messagesToAck;
     }
 
 
@@ -473,11 +475,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
             entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
 
 
-        final Observable<IndexOperationMessage> merged = entityDeleteResults
+        entityDeleteResults
             .getEntitiesCompacted()
-            .collect(() -> new ArrayList<>(), (list, item) -> list.add(item))
-            .flatMap(collected -> entityDeleteResults.getIndexObservable()) ;
-        return merged;
+            .collect(() -> new ArrayList<>(), (list, item) -> list.add(item)).toBlocking().lastOrDefault(null);
+
+        return entityDeleteResults.getIndexObservable();
     }
 
 
@@ -574,10 +576,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     List<QueueMessage> messagesToAck = handleMessages(messages);
 
                                     if (messagesToAck == null || messagesToAck.size() == 0) {
+                                        logger.error("No messages came back from the queue operation",messages);
                                         return messagesToAck;
                                     }
+                                    if(messagesToAck.size()<messages.size()){
+                                        logger.error("Missing messages from queue post operation",messages,messagesToAck);
+                                    }
                                     //ack each message, but only if we didn't error.
                                     ack(messagesToAck);
+                                    //messagesToAck.stream().forEach(message -> ack(message));
                                     return messagesToAck;
                                 } catch (Exception e) {
                                     logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e);