You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/21 22:13:01 UTC
[50/50] usergrid git commit: fix inconsistent errors
fix inconsistent errors
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/62f28d40
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/62f28d40
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/62f28d40
Branch: refs/heads/jacoco
Commit: 62f28d40e9eb043a3b03ae5afcc1c2cc73c71d6e
Parents: af11143
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 21 13:33:18 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 21 13:33:18 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 101 +++++++++----------
.../persistence/queue/LocalQueueManager.java | 6 +-
.../persistence/queue/QueueManager.java | 2 +-
.../queue/impl/SNSQueueManagerImpl.java | 8 +-
.../persistence/queue/QueueManagerTest.java | 10 +-
.../services/notifications/QueueListener.java | 2 +-
.../services/queues/ImportQueueManager.java | 5 +-
.../usergrid/services/queues/QueueListener.java | 3 +-
8 files changed, 65 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a4dd257..ef03bf8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -229,7 +229,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
try {
//signal to SQS
- this.queue.sendMessages( operations );
+ this.queue.sendMessages(operations);
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -241,7 +241,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
/**
* Take message from SQS
*/
- private Observable<QueueMessage> take() {
+ private List<QueueMessage> take() {
final Timer.Context timer = this.readTimer.time();
@@ -291,24 +291,23 @@ public class AmazonAsyncEventService implements AsyncEventService {
logger.debug("callEventHandlers with {} message", messages.size());
}
- Stream<IndexEventResult> indexEventResults = messages.stream().map( message -> {
+ Stream<IndexEventResult> indexEventResults = messages.stream().map(message -> {
AsyncEvent event = null;
try {
- event = ( AsyncEvent ) message.getBody();
- }
- catch ( ClassCastException cce ) {
- logger.error( "Failed to deserialize message body", cce );
+ event = (AsyncEvent) message.getBody();
+ } catch (ClassCastException cce) {
+ logger.error("Failed to deserialize message body", cce);
}
- if ( event == null ) {
- logger.error( "AsyncEvent type or event is null!" );
- return new IndexEventResult( Optional.fromNullable( message ), Optional.<IndexOperationMessage>absent(),
- System.currentTimeMillis() );
+ if (event == null) {
+ logger.error("AsyncEvent type or event is null!");
+ return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),
+ System.currentTimeMillis());
}
final AsyncEvent thisEvent = event;
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Processing {} event", event );
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing {} event", event);
}
try {
@@ -316,55 +315,47 @@ public class AmazonAsyncEventService implements AsyncEventService {
boolean validateEmptySets = true;
Observable<IndexOperationMessage> indexoperationObservable;
//merge each operation to a master observable;
- if ( event instanceof EdgeDeleteEvent ) {
- indexoperationObservable = handleEdgeDelete( message );
- }
- else if ( event instanceof EdgeIndexEvent ) {
- indexoperationObservable = handleEdgeIndex( message );
- }
- else if ( event instanceof EntityDeleteEvent ) {
- indexoperationObservable = handleEntityDelete( message );
- }
- else if ( event instanceof EntityIndexEvent ) {
- indexoperationObservable = handleEntityIndexUpdate( message );
- }
- else if ( event instanceof InitializeApplicationIndexEvent ) {
+ if (event instanceof EdgeDeleteEvent) {
+ indexoperationObservable = handleEdgeDelete(message);
+ } else if (event instanceof EdgeIndexEvent) {
+ indexoperationObservable = handleEdgeIndex(message);
+ } else if (event instanceof EntityDeleteEvent) {
+ indexoperationObservable = handleEntityDelete(message);
+ } else if (event instanceof EntityIndexEvent) {
+ indexoperationObservable = handleEntityIndexUpdate(message);
+ } else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
- handleInitializeApplicationIndex( event, message );
- indexoperationObservable = Observable.just( new IndexOperationMessage() );
+ handleInitializeApplicationIndex(event, message);
+ indexoperationObservable = Observable.just(new IndexOperationMessage());
validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- }
- else if ( event instanceof ElasticsearchIndexEvent ) {
- handleIndexOperation( ( ElasticsearchIndexEvent ) event );
- indexoperationObservable = Observable.just( new IndexOperationMessage() );
+ } else if (event instanceof ElasticsearchIndexEvent) {
+ handleIndexOperation((ElasticsearchIndexEvent) event);
+ indexoperationObservable = Observable.just(new IndexOperationMessage());
validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- }
-
- else {
- throw new Exception( "Unknown EventType" );//TODO: print json instead
+ } else {
+ throw new Exception("Unknown EventType");//TODO: print json instead
}
//collect all of the
IndexOperationMessage indexOperationMessage = indexoperationObservable
- .collect( () -> new IndexOperationMessage(), ( collector, single ) -> collector.ingest( single ) )
- .toBlocking().lastOrDefault( null );
+ .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+ .toBlocking().lastOrDefault(null);
- if ( validateEmptySets && ( indexOperationMessage == null || indexOperationMessage.isEmpty() ) ) {
- logger.error( "Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
- message.getStringBody() );
- throw new Exception( "Received empty index sequence." );
+ if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
+ logger.error("Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
+ message.getStringBody());
+ throw new Exception("Received empty index sequence.");
}
//return type that can be indexed and ack'd later
- return new IndexEventResult( Optional.fromNullable( message ),
- Optional.fromNullable( indexOperationMessage ), thisEvent.getCreationTime() );
- }
- catch ( Exception e ) {
- logger.error( "Failed to index message: " + message.getMessageId(), message.getStringBody(), e );
- return new IndexEventResult( Optional.absent(), Optional.<IndexOperationMessage>absent(),
+ return new IndexEventResult(Optional.fromNullable(message),
+ Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
+ } catch (Exception e) {
+ logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
+ return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(),
event.getCreationTime());
- }
- });
+ }
+ });
return indexEventResults.collect(Collectors.toList());
@@ -373,7 +364,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
- applicationScope );
+ applicationScope);
offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
}
@@ -442,7 +433,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
- entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge));
+ entity -> eventBuilder.buildNewEdge(applicationScope, entity, edge));
return edgeIndexObservable;
}
@@ -470,7 +461,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
- final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
+ final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge(applicationScope, edge);
return observable;
}
@@ -647,9 +638,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
do {
try {
- drainList = take().toList().toBlocking().lastOrDefault( null );
+ drainList = take();
//emit our list in it's entity to hand off to a worker pool
- subscriber.onNext( drainList );
+ subscriber.onNext(drainList);
//take since we're in flight
inFlight.addAndGet( drainList.size() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 8be6099..d5f6858 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -40,7 +40,7 @@ public class LocalQueueManager implements QueueManager {
public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
@Override
- public Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+ public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
List<QueueMessage> returnQueue = new ArrayList<>();
try {
QueueMessage message=null;
@@ -54,7 +54,7 @@ public class LocalQueueManager implements QueueManager {
}catch (InterruptedException ie){
throw new RuntimeException(ie);
}
- return Observable.from( returnQueue);
+ return returnQueue;
}
@Override
@@ -87,7 +87,7 @@ public class LocalQueueManager implements QueueManager {
public <T extends Serializable> void sendMessage( final T body ) throws IOException {
String uuid = UUID.randomUUID().toString();
try {
- queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+ queue.offer(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS);
}catch (InterruptedException ie){
throw new RuntimeException(ie);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 34a3654..4c948e3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -36,7 +36,7 @@ public interface QueueManager {
* @param klass class to cast the return from
* @return List of Queue Messages
*/
- Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
+ List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
/**
* get the queue depth
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bc5f2f1..31b478d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -399,12 +399,12 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
- public rx.Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+ public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
final Class klass ) {
if ( sqs == null ) {
logger.error( "SQS is null - was not initialized properly" );
- return rx.Observable.empty();
+ return new ArrayList<>(0);
}
String url = getReadQueue().getUrl();
@@ -462,7 +462,7 @@ public class SNSQueueManagerImpl implements QueueManager {
queueMessages.add( queueMessage );
}
- return rx.Observable.from( queueMessages );
+ return queueMessages ;
}
catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) {
logger.error( String.format( "Queue does not exist! [%s]", url ), dne );
@@ -471,7 +471,7 @@ public class SNSQueueManagerImpl implements QueueManager {
logger.error( String.format( "Programming error getting messages from queue=[%s] exist!", url ), e );
}
- return rx.Observable.from( new ArrayList<>( 0 ) );
+ return new ArrayList<>( 0 ) ;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index ac70af6..c8661c0 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -82,14 +82,14 @@ public class QueueManagerTest {
public void send() throws Exception{
String value = "bodytest";
qm.sendMessage(value);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
+ List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
- messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
+ messageList = qm.getMessages(1,5000,5000,String.class);
assertTrue(messageList.size() <= 0);
}
@@ -102,14 +102,14 @@ public class QueueManagerTest {
List<Map<String,String>> bodies = new ArrayList<>();
bodies.add(values);
qm.sendMessages(bodies);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
+ List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass());
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(values));
}
qm.commitMessages(messageList);
- messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
+ messageList = qm.getMessages(1,5000,5000,values.getClass());
assertTrue(messageList.size() <= 0);
}
@@ -133,7 +133,7 @@ public class QueueManagerTest {
}
assertTrue(depth>0);
- List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
+ List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass());
assertTrue(messageList.size() <= 500);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(values));
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7e271c9..bcf7b49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -154,7 +154,7 @@ public class QueueListener {
while ( true ) {
Timer.Context timerContext = timer.time();
- queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class)
+ rx.Observable.from(queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class))
.buffer(getBatchSize())
.doOnNext(messages -> {
try {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index bc55ff4..272bb65 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.services.queues;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import org.apache.usergrid.persistence.queue.QueueManager;
@@ -36,9 +37,9 @@ import rx.Observable;
public class ImportQueueManager implements QueueManager {
@Override
- public Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+ public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
final Class klass ) {
- return Observable.empty();
+ return new ArrayList<>();
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 36e347e..5404e6b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.*;
import javax.annotation.PostConstruct;
import java.util.*;
@@ -172,7 +173,7 @@ public abstract class QueueListener {
Timer.Context timerContext = timer.time();
//Get the messages out of the queue.
//TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
- queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class)
+ rx.Observable.from( queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class))
.buffer(getBatchSize())
.doOnNext(messages -> {
try {