You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/14 23:50:44 UTC

[29/32] usergrid git commit: add better logging for empty sequences

add better logging for empty sequences


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9cbe283d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9cbe283d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9cbe283d

Branch: refs/heads/master
Commit: 9cbe283d0af45b877fb93dfae81cdb343cec26c5
Parents: a2a07aa
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 12 16:27:13 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 12 16:27:13 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 13 ++++++-----
 .../asyncevents/EventBuilderImpl.java           |  3 ++-
 .../core/rx/ExceptionBehaviorTest.java          | 24 ++++++++++++++++++++
 .../persistence/queue/QueueMessage.java         | 10 ++++++++
 .../queue/impl/SNSQueueManagerImpl.java         |  2 ++
 .../queue/impl/SQSQueueManagerImpl.java         |  1 +
 6 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 45d83cd..f1a02ce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -299,17 +299,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 //collect all of the
                 IndexOperationMessage indexOperationMessage =
                     indexoperationObservable
-                        .collect(() -> new IndexOperationMessage(), (collector, single ) -> collector.ingest(single))
+                        .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
                         .toBlocking().lastOrDefault(null);
 
-                if(indexOperationMessage == null){
-                    throw new IllegalArgumentException("Received null index operation.");
+                if (indexOperationMessage == null || indexOperationMessage.isEmpty()) {
+                    logger.info("Received empty index sequence message:({}), body:({}) ",
+                        message.getMessageId(),message.getStringBody());
                 }
 
                 //return type that can be indexed and ack'd later
                 return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
             } catch (Exception e) {
-                logger.error("Failed to index message: " + message.getMessageId(), e, message);
+                logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody() ,e);
                 return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
             }
         });
@@ -559,7 +560,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     List<IndexEventResult> indexEventResults = callEventHandlers(messages);
                                     List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
                                     if (messagesToAck == null || messagesToAck.size() == 0) {
-                                        logger.error("No messages came back from the queue operation",messages);
+                                        logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages);
                                         return messagesToAck;
                                     }
                                     if(messagesToAck.size()<messages.size()){
@@ -569,7 +570,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     ack(messagesToAck);
                                     return messagesToAck;
                                 } catch (Exception e) {
-                                    logger.error("failed to ack messages to sqs", messages, e);
+                                    logger.error("failed to ack messages to sqs", e);
                                     return null;
                                     //do not rethrow so we can process all of them
                                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 4bf5695..cc0356b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -181,8 +181,9 @@ public class EventBuilderImpl implements EventBuilder {
                     return true;
                 }
 
+                //entityIndexOperation.getUpdatedSince will always be 0 except for reindexing the application
                 //only re-index if it has been updated and been updated after our timestamp
-                return  modified.getValue() >= entityIndexOperation.getUpdatedSince();
+                return modified.getValue() >= entityIndexOperation.getUpdatedSince();
             } )
             //perform indexing on the task scheduler and start it
             .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
index 8e4f4c4..cb39ca1 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
@@ -18,12 +18,16 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import rx.Observable;
 import rx.Observer;
 import rx.schedulers.Schedulers;
 
+import java.util.ArrayList;
+import java.util.List;
+
 
 /**
  * Tests RX exception behavior
@@ -47,6 +51,26 @@ public class ExceptionBehaviorTest {
         } ).toBlocking().last();
     }
 
+    @Test()
+    public void testSequence(){
+        ArrayList listReturn =  Observable.range(0, 1).flatMap(i -> Observable.empty())
+            .collect(()->new ArrayList(),(list,i) ->{
+                list.add(i);
+            }).toBlocking().lastOrDefault(null);
+
+        Assert.assertEquals(listReturn,new ArrayList<Integer>());
+    }
+
+    @Test()
+    public void testSequence2(){
+        ArrayList listReturn =  Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
+            .collect(()->new ArrayList(),(list,i) ->{
+                list.add(i);
+            }).toBlocking().lastOrDefault(null);
+
+        Assert.assertEquals(listReturn,new ArrayList<Integer>());
+    }
+
 //
 //    /**
 //     * This shows that no re-throw happens on subscribe.  This is as designed, but not as expected

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
index 0874e9c..55f79f4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -22,6 +22,7 @@ public class QueueMessage {
     private final String messageId;
     private final String handle;
     private final String type;
+    private String stringBody;
 
 
     public QueueMessage(String messageId, String handle, Object body,String type) {
@@ -29,6 +30,7 @@ public class QueueMessage {
         this.messageId = messageId;
         this.handle = handle;
         this.type = type;
+        this.stringBody = "";
     }
 
     public String getHandle() {
@@ -47,4 +49,12 @@ public class QueueMessage {
     public String getType() {
         return type;
     }
+
+    public void setStringBody(String stringBody) {
+        this.stringBody = stringBody;
+    }
+
+    public String getStringBody() {
+        return stringBody;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bc63f53..a2b5d72 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -364,6 +364,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
             for (Message message : messages) {
                 Object body;
+                final String originalBody = message.getBody();
 
                 try {
                     final JsonNode bodyNode =  mapper.readTree(message.getBody());
@@ -375,6 +376,7 @@ public class SNSQueueManagerImpl implements QueueManager {
                 }
 
                 QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+                queueMessage.setStringBody(originalBody);
                 queueMessages.add(queueMessage);
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index daa1cb5..fa9a7ac 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -196,6 +196,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             }
 
             QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+            queueMessage.setStringBody(message.getBody());
             queueMessages.add(queueMessage);
         }