You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/13 23:35:44 UTC
[1/2] usergrid git commit: remove observable
Repository: usergrid
Updated Branches:
refs/heads/remove-inmemory-event-service 32d35e7d5 -> 298f0d8b8
remove observable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/dbf37e48
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/dbf37e48
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/dbf37e48
Branch: refs/heads/remove-inmemory-event-service
Commit: dbf37e48caee2ca7fb5a2468f4800ad4bdf196ea
Parents: 9cbe283
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 13 11:14:21 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 13 11:14:21 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 114 +++++++++----------
1 file changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/dbf37e48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f1a02ce..95126c6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.base.Optional;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
@@ -248,7 +249,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
-
/**
* calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
* @param messages
@@ -259,71 +259,71 @@ public class AmazonAsyncEventService implements AsyncEventService {
logger.debug("callEventHandlers with {} message", messages.size());
}
- Observable<IndexEventResult> masterObservable = Observable.from(messages).map(message -> {
- AsyncEvent event = null;
- try {
- event = (AsyncEvent) message.getBody();
- } catch (ClassCastException cce) {
- logger.error("Failed to deserialize message body", cce);
- }
-
- if (event == null) {
- logger.error("AsyncEvent type or event is null!");
- return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
- }
-
- final AsyncEvent thisEvent = event;
- if (logger.isDebugEnabled()) {
- logger.debug("Processing {} event", event);
- }
-
- try {
- Observable<IndexOperationMessage> indexoperationObservable;
- //merge each operation to a master observable;
- if (event instanceof EdgeDeleteEvent) {
- indexoperationObservable = handleEdgeDelete(message);
- } else if (event instanceof EdgeIndexEvent) {
- indexoperationObservable = handleEdgeIndex(message);
- } else if (event instanceof EntityDeleteEvent) {
- indexoperationObservable = handleEntityDelete(message);
- } else if (event instanceof EntityIndexEvent) {
- indexoperationObservable = handleEntityIndexUpdate(message);
- } else if (event instanceof InitializeApplicationIndexEvent) {
- //does not return observable
- handleInitializeApplicationIndex(event, message);
- indexoperationObservable = Observable.just(new IndexOperationMessage());
- } else {
- throw new Exception("Unknown EventType");//TODO: print json instead
+ Stream<IndexEventResult> indexEventResults = messages.stream()
+ .map(message -> {
+ AsyncEvent event = null;
+ try {
+ event = (AsyncEvent) message.getBody();
+ } catch (ClassCastException cce) {
+ logger.error("Failed to deserialize message body", cce);
}
- //collect all of the
- IndexOperationMessage indexOperationMessage =
- indexoperationObservable
- .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
- .toBlocking().lastOrDefault(null);
+ if (event == null) {
+ logger.error("AsyncEvent type or event is null!");
+ return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
+ }
- if (indexOperationMessage == null || indexOperationMessage.isEmpty()) {
- logger.info("Received empty index sequence message:({}), body:({}) ",
- message.getMessageId(),message.getStringBody());
+ final AsyncEvent thisEvent = event;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing {} event", event);
}
- //return type that can be indexed and ack'd later
- return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
- } catch (Exception e) {
- logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody() ,e);
- return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
- }
- });
- //resolve the list and return it.
- final List<IndexEventResult> indexEventResults = masterObservable
- .collect(() -> new ArrayList<IndexEventResult>(), (list,indexEventResult) -> list.add(indexEventResult) )
- .toBlocking().lastOrDefault(null);
+ try {
+ //check for empty sets if this is true
+ boolean validateEmptySets = true;
+ Observable<IndexOperationMessage> indexoperationObservable;
+ //merge each operation to a master observable;
+ if (event instanceof EdgeDeleteEvent) {
+ indexoperationObservable = handleEdgeDelete(message);
+ } else if (event instanceof EdgeIndexEvent) {
+ indexoperationObservable = handleEdgeIndex(message);
+ } else if (event instanceof EntityDeleteEvent) {
+ indexoperationObservable = handleEntityDelete(message);
+ } else if (event instanceof EntityIndexEvent) {
+ indexoperationObservable = handleEntityIndexUpdate(message);
+ } else if (event instanceof InitializeApplicationIndexEvent) {
+ //does not return observable
+ handleInitializeApplicationIndex(event, message);
+ indexoperationObservable = Observable.just(new IndexOperationMessage());
+ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
+ } else {
+ throw new Exception("Unknown EventType");//TODO: print json instead
+ }
+
+ //collect all of the
+ IndexOperationMessage indexOperationMessage =
+ indexoperationObservable
+ .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+ .toBlocking().lastOrDefault(null);
+
+ if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
+ logger.error("Received empty index sequence message:({}), body:({}) ",
+ message.getMessageId(), message.getStringBody());
+ throw new Exception("Received empty index sequence.");
+ }
+
+ //return type that can be indexed and ack'd later
+ return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
+ } catch (Exception e) {
+ logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
+ return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
+ }
+ });
- return indexEventResults;
+ return indexEventResults.collect(Collectors.toList());
}
-
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
[2/2] usergrid git commit: more exception monitoring + merge
Posted by sf...@apache.org.
more exception monitoring + merge
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/298f0d8b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/298f0d8b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/298f0d8b
Branch: refs/heads/remove-inmemory-event-service
Commit: 298f0d8b833e0c3b0e564690e4358fd4041c46ec
Parents: 32d35e7 dbf37e4
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 13 15:35:31 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 13 15:35:31 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 16 ++++++------
.../core/rx/ExceptionBehaviorTest.java | 26 ++++++++++++++++++++
.../index/impl/IndexRefreshCommandImpl.java | 15 +----------
3 files changed, 35 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/298f0d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 0fef974,95126c6..757e3a0
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@@ -279,7 -279,7 +279,8 @@@ public class AmazonAsyncEventService im
}
try {
- //check for empty sets
++
+ //check for empty sets if this is true
boolean validateEmptySets = true;
Observable<IndexOperationMessage> indexoperationObservable;
//merge each operation to a master observable;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/298f0d8b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
index cb39ca1,cb39ca1..7eebc6e
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java
@@@ -27,6 -27,6 +27,7 @@@ import rx.schedulers.Schedulers
import java.util.ArrayList;
import java.util.List;
++import java.util.stream.Collectors;
/**
@@@ -71,6 -71,6 +72,31 @@@ public class ExceptionBehaviorTest
Assert.assertEquals(listReturn,new ArrayList<Integer>());
}
++ @Test()
++ public void testSequence3(){
++ ArrayList listReturn = Observable.range(0, 2)
++ .collect(()->new ArrayList(),(list,i) ->{
++ list.add(i);
++ }).toBlocking().first();
++
++ Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
++ }
++
++ @Test(expected = TestException.class)
++ public void testStreamException(){
++ List<Integer> listReturn = new ArrayList<Integer>();
++ listReturn.add(0);
++ listReturn.add(1);
++ listReturn.add(2);
++ listReturn.stream().map(i->{
++ if(i%2 == 0){
++ throw new TestException("test");
++ }
++ return i * 2;
++ }).collect(Collectors.toList());
++ Assert.fail("test");
++ }
++
//
// /**
// * This shows that no re-throw happens on subscribe. This is as designed, but not as expected
http://git-wip-us.apache.org/repos/asf/usergrid/blob/298f0d8b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 04f8652,087eefe..bd67fb6
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@@ -55,28 -55,28 +55,15 @@@ import rx.Observable
public class IndexRefreshCommandImpl implements IndexRefreshCommand {
private static final Logger logger = LoggerFactory.getLogger( IndexRefreshCommandImpl.class );
-- private final IndexCache indexCache;
private final EsProvider esProvider;
-- private final IndexProducer producer;
-- private final IndexFig indexFig;
private final Timer timer;
@Inject
-- public IndexRefreshCommandImpl(
-- final EsProvider esProvider,
-- final IndexProducer producer,
-- final IndexFig indexFig,
- l final MetricsFactory metricsFactory,
- final MetricsFactory metricsFactory,
-- final IndexCache indexCache ) {
--
++ public IndexRefreshCommandImpl( final EsProvider esProvider,final MetricsFactory metricsFactory) {
this.timer = metricsFactory.getTimer( IndexRefreshCommandImpl.class, "index.refresh" );
--
this.esProvider = esProvider;
-- this.producer = producer;
-- this.indexFig = indexFig;
-- this.indexCache = indexCache;
}