You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/21 22:13:01 UTC

[50/50] usergrid git commit: fix inconsistent errors

fix inconsistent errors


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/62f28d40
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/62f28d40
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/62f28d40

Branch: refs/heads/jacoco
Commit: 62f28d40e9eb043a3b03ae5afcc1c2cc73c71d6e
Parents: af11143
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 21 13:33:18 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 21 13:33:18 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 101 +++++++++----------
 .../persistence/queue/LocalQueueManager.java    |   6 +-
 .../persistence/queue/QueueManager.java         |   2 +-
 .../queue/impl/SNSQueueManagerImpl.java         |   8 +-
 .../persistence/queue/QueueManagerTest.java     |  10 +-
 .../services/notifications/QueueListener.java   |   2 +-
 .../services/queues/ImportQueueManager.java     |   5 +-
 .../usergrid/services/queues/QueueListener.java |   3 +-
 8 files changed, 65 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a4dd257..ef03bf8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -229,7 +229,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         try {
             //signal to SQS
-            this.queue.sendMessages( operations );
+            this.queue.sendMessages(operations);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -241,7 +241,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     /**
      * Take message from SQS
      */
-    private Observable<QueueMessage> take() {
+    private List<QueueMessage> take() {
 
         final Timer.Context timer = this.readTimer.time();
 
@@ -291,24 +291,23 @@ public class AmazonAsyncEventService implements AsyncEventService {
             logger.debug("callEventHandlers with {} message", messages.size());
         }
 
-        Stream<IndexEventResult> indexEventResults = messages.stream().map( message -> {
+        Stream<IndexEventResult> indexEventResults = messages.stream().map(message -> {
             AsyncEvent event = null;
             try {
-                event = ( AsyncEvent ) message.getBody();
-            }
-            catch ( ClassCastException cce ) {
-                logger.error( "Failed to deserialize message body", cce );
+                event = (AsyncEvent) message.getBody();
+            } catch (ClassCastException cce) {
+                logger.error("Failed to deserialize message body", cce);
             }
 
-            if ( event == null ) {
-                logger.error( "AsyncEvent type or event is null!" );
-                return new IndexEventResult( Optional.fromNullable( message ), Optional.<IndexOperationMessage>absent(),
-                    System.currentTimeMillis() );
+            if (event == null) {
+                logger.error("AsyncEvent type or event is null!");
+                return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),
+                    System.currentTimeMillis());
             }
 
             final AsyncEvent thisEvent = event;
-            if ( logger.isDebugEnabled() ) {
-                logger.debug( "Processing {} event", event );
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing {} event", event);
             }
 
             try {
@@ -316,55 +315,47 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 boolean validateEmptySets = true;
                 Observable<IndexOperationMessage> indexoperationObservable;
                 //merge each operation to a master observable;
-                if ( event instanceof EdgeDeleteEvent ) {
-                    indexoperationObservable = handleEdgeDelete( message );
-                }
-                else if ( event instanceof EdgeIndexEvent ) {
-                    indexoperationObservable = handleEdgeIndex( message );
-                }
-                else if ( event instanceof EntityDeleteEvent ) {
-                    indexoperationObservable = handleEntityDelete( message );
-                }
-                else if ( event instanceof EntityIndexEvent ) {
-                    indexoperationObservable = handleEntityIndexUpdate( message );
-                }
-                else if ( event instanceof InitializeApplicationIndexEvent ) {
+                if (event instanceof EdgeDeleteEvent) {
+                    indexoperationObservable = handleEdgeDelete(message);
+                } else if (event instanceof EdgeIndexEvent) {
+                    indexoperationObservable = handleEdgeIndex(message);
+                } else if (event instanceof EntityDeleteEvent) {
+                    indexoperationObservable = handleEntityDelete(message);
+                } else if (event instanceof EntityIndexEvent) {
+                    indexoperationObservable = handleEntityIndexUpdate(message);
+                } else if (event instanceof InitializeApplicationIndexEvent) {
                     //does not return observable
-                    handleInitializeApplicationIndex( event, message );
-                    indexoperationObservable = Observable.just( new IndexOperationMessage() );
+                    handleInitializeApplicationIndex(event, message);
+                    indexoperationObservable = Observable.just(new IndexOperationMessage());
                     validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                }
-                else if ( event instanceof ElasticsearchIndexEvent ) {
-                    handleIndexOperation( ( ElasticsearchIndexEvent ) event );
-                    indexoperationObservable = Observable.just( new IndexOperationMessage() );
+                } else if (event instanceof ElasticsearchIndexEvent) {
+                    handleIndexOperation((ElasticsearchIndexEvent) event);
+                    indexoperationObservable = Observable.just(new IndexOperationMessage());
                     validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                }
-
-                else {
-                    throw new Exception( "Unknown EventType" );//TODO: print json instead
+                } else {
+                    throw new Exception("Unknown EventType");//TODO: print json instead
                 }
 
                 //collect all of the
                 IndexOperationMessage indexOperationMessage = indexoperationObservable
-                    .collect( () -> new IndexOperationMessage(), ( collector, single ) -> collector.ingest( single ) )
-                    .toBlocking().lastOrDefault( null );
+                    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+                    .toBlocking().lastOrDefault(null);
 
-                if ( validateEmptySets && ( indexOperationMessage == null || indexOperationMessage.isEmpty() ) ) {
-                    logger.error( "Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
-                        message.getStringBody() );
-                    throw new Exception( "Received empty index sequence." );
+                if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
+                    logger.error("Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
+                        message.getStringBody());
+                    throw new Exception("Received empty index sequence.");
                 }
 
                 //return type that can be indexed and ack'd later
-                return new IndexEventResult( Optional.fromNullable( message ),
-                    Optional.fromNullable( indexOperationMessage ), thisEvent.getCreationTime() );
-            }
-            catch ( Exception e ) {
-                logger.error( "Failed to index message: " + message.getMessageId(), message.getStringBody(), e );
-                return new IndexEventResult( Optional.absent(), Optional.<IndexOperationMessage>absent(),
+                return new IndexEventResult(Optional.fromNullable(message),
+                    Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
+            } catch (Exception e) {
+                logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
+                return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(),
                     event.getCreationTime());
-                }
-            });
+            }
+        });
 
 
         return indexEventResults.collect(Collectors.toList());
@@ -373,7 +364,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     @Override
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
-            applicationScope );
+            applicationScope);
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
             new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
     }
@@ -442,7 +433,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
 
         final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
-            entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge));
+            entity -> eventBuilder.buildNewEdge(applicationScope, entity, edge));
         return edgeIndexObservable;
     }
 
@@ -470,7 +461,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
 
-        final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
+        final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge(applicationScope, edge);
         return observable;
     }
 
@@ -647,9 +638,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
                             do {
                                 try {
-                                    drainList = take().toList().toBlocking().lastOrDefault( null );
+                                    drainList = take();
                                     //emit our list in it's entity to hand off to a worker pool
-                                    subscriber.onNext( drainList );
+                                        subscriber.onNext(drainList);
 
                                     //take since  we're in flight
                                     inFlight.addAndGet( drainList.size() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 8be6099..d5f6858 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -40,7 +40,7 @@ public class LocalQueueManager implements QueueManager {
     public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
 
     @Override
-    public    Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+    public    List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
         List<QueueMessage> returnQueue = new ArrayList<>();
         try {
             QueueMessage message=null;
@@ -54,7 +54,7 @@ public class LocalQueueManager implements QueueManager {
         }catch (InterruptedException ie){
             throw new RuntimeException(ie);
         }
-        return Observable.from( returnQueue);
+        return returnQueue;
     }
 
     @Override
@@ -87,7 +87,7 @@ public class LocalQueueManager implements QueueManager {
     public <T extends Serializable> void sendMessage( final T body ) throws IOException {
         String uuid = UUID.randomUUID().toString();
         try {
-            queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+            queue.offer(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS);
         }catch (InterruptedException ie){
             throw new RuntimeException(ie);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 34a3654..4c948e3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -36,7 +36,7 @@ public interface QueueManager {
      * @param klass class to cast the return from
      * @return List of Queue Messages
      */
-    Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
+    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
 
     /**
      * get the queue depth

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bc5f2f1..31b478d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -399,12 +399,12 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     @Override
-    public rx.Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+    public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
                                                     final Class klass ) {
 
         if ( sqs == null ) {
             logger.error( "SQS is null - was not initialized properly" );
-            return rx.Observable.empty();
+            return new ArrayList<>(0);
         }
 
         String url = getReadQueue().getUrl();
@@ -462,7 +462,7 @@ public class SNSQueueManagerImpl implements QueueManager {
                 queueMessages.add( queueMessage );
             }
 
-            return rx.Observable.from( queueMessages );
+            return  queueMessages ;
         }
         catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) {
             logger.error( String.format( "Queue does not exist! [%s]", url ), dne );
@@ -471,7 +471,7 @@ public class SNSQueueManagerImpl implements QueueManager {
             logger.error( String.format( "Programming error getting messages from queue=[%s] exist!", url ), e );
         }
 
-        return rx.Observable.from( new ArrayList<>( 0 ) );
+        return  new ArrayList<>( 0 ) ;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index ac70af6..c8661c0 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -82,14 +82,14 @@ public class QueueManagerTest {
     public void send() throws Exception{
         String value = "bodytest";
         qm.sendMessage(value);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
 
-        messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
+        messageList = qm.getMessages(1,5000,5000,String.class);
         assertTrue(messageList.size() <= 0);
 
     }
@@ -102,14 +102,14 @@ public class QueueManagerTest {
         List<Map<String,String>> bodies = new ArrayList<>();
         bodies.add(values);
         qm.sendMessages(bodies);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass());
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(values));
         }
         qm.commitMessages(messageList);
 
-        messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
+        messageList = qm.getMessages(1,5000,5000,values.getClass());
         assertTrue(messageList.size() <= 0);
 
     }
@@ -133,7 +133,7 @@ public class QueueManagerTest {
         }
         assertTrue(depth>0);
 
-        List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
+        List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass());
         assertTrue(messageList.size() <= 500);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(values));

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7e271c9..bcf7b49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -154,7 +154,7 @@ public class QueueListener  {
         while ( true ) {
 
                 Timer.Context timerContext = timer.time();
-                queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class)
+                rx.Observable.from(queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class))
                     .buffer(getBatchSize())
                     .doOnNext(messages -> {
                         try {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index bc55ff4..272bb65 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.services.queues;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -36,9 +37,9 @@ import rx.Observable;
 public class ImportQueueManager implements QueueManager {
 
     @Override
-    public Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+    public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
                                            final Class klass ) {
-        return Observable.empty();
+        return new ArrayList<>();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62f28d40/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 36e347e..5404e6b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rx.*;
 
 import javax.annotation.PostConstruct;
 import java.util.*;
@@ -172,7 +173,7 @@ public abstract class QueueListener  {
                 Timer.Context timerContext = timer.time();
                 //Get the messages out of the queue.
                 //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
-                queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class)
+                rx.Observable.from( queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class))
                     .buffer(getBatchSize())
                     .doOnNext(messages -> {
                         try {