You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/14 23:50:44 UTC
[29/32] usergrid git commit: add better logging for empty sequences
add better logging for empty sequences
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9cbe283d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9cbe283d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9cbe283d
Branch: refs/heads/master
Commit: 9cbe283d0af45b877fb93dfae81cdb343cec26c5
Parents: a2a07aa
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 12 16:27:13 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 12 16:27:13 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 13 ++++++-----
.../asyncevents/EventBuilderImpl.java | 3 ++-
.../core/rx/ExceptionBehaviorTest.java | 24 ++++++++++++++++++++
.../persistence/queue/QueueMessage.java | 10 ++++++++
.../queue/impl/SNSQueueManagerImpl.java | 2 ++
.../queue/impl/SQSQueueManagerImpl.java | 1 +
6 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 45d83cd..f1a02ce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -299,17 +299,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
//collect all of the
IndexOperationMessage indexOperationMessage =
indexoperationObservable
- .collect(() -> new IndexOperationMessage(), (collector, single ) -> collector.ingest(single))
+ .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.toBlocking().lastOrDefault(null);
- if(indexOperationMessage == null){
- throw new IllegalArgumentException("Received null index operation.");
+ if (indexOperationMessage == null || indexOperationMessage.isEmpty()) {
+ logger.info("Received empty index sequence message:({}), body:({}) ",
+ message.getMessageId(),message.getStringBody());
}
//return type that can be indexed and ack'd later
return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
} catch (Exception e) {
- logger.error("Failed to index message: " + message.getMessageId(), e, message);
+ logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody() ,e);
return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
}
});
@@ -559,7 +560,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
List<IndexEventResult> indexEventResults = callEventHandlers(messages);
List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
if (messagesToAck == null || messagesToAck.size() == 0) {
- logger.error("No messages came back from the queue operation",messages);
+ logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages);
return messagesToAck;
}
if(messagesToAck.size()<messages.size()){
@@ -569,7 +570,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
ack(messagesToAck);
return messagesToAck;
} catch (Exception e) {
- logger.error("failed to ack messages to sqs", messages, e);
+ logger.error("failed to ack messages to sqs", e);
return null;
//do not rethrow so we can process all of them
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 4bf5695..cc0356b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -181,8 +181,9 @@ public class EventBuilderImpl implements EventBuilder {
return true;
}
+ //entityIndexOperation.getUpdatedSince will always be 0 except for reindexing the application
//only re-index if it has been updated and been updated after our timestamp
- return modified.getValue() >= entityIndexOperation.getUpdatedSince();
+ return modified.getValue() >= entityIndexOperation.getUpdatedSince();
} )
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
index 8e4f4c4..cb39ca1 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
@@ -18,12 +18,16 @@
package org.apache.usergrid.persistence.core.rx;
+import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.schedulers.Schedulers;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Tests RX exception behavior
@@ -47,6 +51,26 @@ public class ExceptionBehaviorTest {
} ).toBlocking().last();
}
+ @Test()
+ public void testSequence(){
+ ArrayList listReturn = Observable.range(0, 1).flatMap(i -> Observable.empty())
+ .collect(()->new ArrayList(),(list,i) ->{
+ list.add(i);
+ }).toBlocking().lastOrDefault(null);
+
+ Assert.assertEquals(listReturn,new ArrayList<Integer>());
+ }
+
+ @Test()
+ public void testSequence2(){
+ ArrayList listReturn = Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
+ .collect(()->new ArrayList(),(list,i) ->{
+ list.add(i);
+ }).toBlocking().lastOrDefault(null);
+
+ Assert.assertEquals(listReturn,new ArrayList<Integer>());
+ }
+
//
// /**
// * This shows that no re-throw happens on subscribe. This is as designed, but not as expected
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
index 0874e9c..55f79f4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -22,6 +22,7 @@ public class QueueMessage {
private final String messageId;
private final String handle;
private final String type;
+ private String stringBody;
public QueueMessage(String messageId, String handle, Object body,String type) {
@@ -29,6 +30,7 @@ public class QueueMessage {
this.messageId = messageId;
this.handle = handle;
this.type = type;
+ this.stringBody = "";
}
public String getHandle() {
@@ -47,4 +49,12 @@ public class QueueMessage {
public String getType() {
return type;
}
+
+ public void setStringBody(String stringBody) {
+ this.stringBody = stringBody;
+ }
+
+ public String getStringBody() {
+ return stringBody;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bc63f53..a2b5d72 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -364,6 +364,7 @@ public class SNSQueueManagerImpl implements QueueManager {
for (Message message : messages) {
Object body;
+ final String originalBody = message.getBody();
try {
final JsonNode bodyNode = mapper.readTree(message.getBody());
@@ -375,6 +376,7 @@ public class SNSQueueManagerImpl implements QueueManager {
}
QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+ queueMessage.setStringBody(originalBody);
queueMessages.add(queueMessage);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9cbe283d/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index daa1cb5..fa9a7ac 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -196,6 +196,7 @@ public class SQSQueueManagerImpl implements QueueManager {
}
QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+ queueMessage.setStringBody(message.getBody());
queueMessages.add(queueMessage);
}