You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/11/12 16:51:53 UTC
[05/32] usergrid git commit: Fix issue where read repair was causing
empty messages to be queued.
Fix issue where read repair was causing empty messages to be queued.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/91354471
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/91354471
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/91354471
Branch: refs/heads/master
Commit: 913544719ddc84c1eb6cbbdfe463a7712ad73051
Parents: dfc70f4
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Nov 4 14:02:53 2015 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Nov 4 14:02:53 2015 -0800
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 8 +++++---
.../pipeline/read/traverse/AbstractReadGraphFilter.java | 2 +-
.../persistence/index/impl/IndexOperationMessage.java | 4 ++--
3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6b9abbc..16e119c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -490,6 +490,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
*/
public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+ // don't try to produce something with nothing
+ if(indexOperationMessage.isEmpty()){
+ return;
+ }
+
final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
final UUID newMessageId = UUIDGenerator.newTimeUUID();
@@ -760,10 +765,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
.map(result -> result.getQueueMessage().get())
.collect(Collectors.toList());
- //only Q it if it's empty
- if(!combined.isEmpty()) {
queueIndexOperationMessage( combined );
- }
return messagesToAck;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 89230d7..78a3450 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -245,8 +245,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
return observable -> observable
- .filter((IndexOperationMessage msg) -> !msg.isEmpty())
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+ .filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
asyncEventService.queueIndexOperationMessage(indexOperation);
});
http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index bcee308..7d19ce3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -115,7 +115,7 @@ public class IndexOperationMessage implements Serializable {
}
public void ingest(IndexOperationMessage singleMessage) {
- this.indexRequests.addAll(singleMessage.getIndexRequests().stream().collect(Collectors.toList()));
- this.deIndexRequests.addAll(singleMessage.getDeIndexRequests().stream().collect(Collectors.toList()));
+ this.indexRequests.addAll(singleMessage.getIndexRequests());
+ this.deIndexRequests.addAll(singleMessage.getDeIndexRequests());
}
}