You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/01 03:14:33 UTC

usergrid git commit: Fix serialization issues with access tokens and simplify the Queue Manager getMessages interface.

Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 bfd2bb34a -> bcc978088


Fix serialization issues with access tokens and simplify the Queue Manager getMessages interface.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bcc97808
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bcc97808
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bcc97808

Branch: refs/heads/release-2.1.1
Commit: bcc97808864c2bb9de9d39b5243592b818e90d5b
Parents: bfd2bb3
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Feb 29 18:14:27 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Feb 29 18:14:27 2016 -0800

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java              |  7 ++-----
 .../usergrid/persistence/queue/LocalQueueManager.java   |  6 +-----
 .../org/apache/usergrid/persistence/queue/QueueFig.java |  4 ++--
 .../apache/usergrid/persistence/queue/QueueManager.java |  6 +-----
 .../persistence/queue/impl/SNSQueueManagerImpl.java     | 12 +++++++-----
 .../usergrid/persistence/queue/QueueManagerTest.java    | 10 +++++-----
 .../shiro/credentials/ApplicationAccessToken.java       |  3 +++
 .../shiro/credentials/OrganizationAccessToken.java      |  3 +++
 .../usergrid/services/notifications/QueueListener.java  |  2 +-
 .../usergrid/services/queues/ImportQueueManager.java    |  4 +---
 .../apache/usergrid/services/queues/QueueListener.java  |  2 +-
 11 files changed, 27 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 782bed7..3f623d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -242,13 +242,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return queue.getMessages(MAX_TAKE,
-                    indexProcessorFig.getIndexQueueVisibilityTimeout(),
-                    Math.max(1000, queueFig.getQueueClientSocketTimeout() - 1000), // 1 sec less than socket timeout
-                    AsyncEvent.class);
+            return queue.getMessages(MAX_TAKE, AsyncEvent.class);
         }
-        //stop our timer
         finally {
+            //stop our timer
             timer.stop();
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 4d26100..1f4261a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -22,17 +22,13 @@ package org.apache.usergrid.persistence.queue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.Observable;
 
 import java.io.IOException;
-import java.util.AbstractQueue;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -45,7 +41,7 @@ public class LocalQueueManager implements QueueManager {
     public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
 
     @Override
-    public    List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+    public    List<QueueMessage> getMessages(int limit, Class klass) {
         List<QueueMessage> returnQueue = new ArrayList<>();
         try {
             QueueMessage message=null;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 7757d58..63d2d80 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -88,11 +88,11 @@ public interface QueueFig extends GuicyFig {
     int getLocalQuorumTimeout();
 
     @Key( "usergrid.queue.client.connection.timeout")
-    @Default( "1000" ) // 3 seconds
+    @Default( "1000" ) // 1 second
     int getQueueClientConnectionTimeout();
 
     @Key( "usergrid.queue.client.socket.timeout")
-    @Default( "3000" ) // 3 seconds
+    @Default( "10000" ) // 10 seconds
     int getQueueClientSocketTimeout();
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 4c948e3..18909e4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -17,8 +17,6 @@
  */
 package org.apache.usergrid.persistence.queue;
 
-import rx.Observable;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
@@ -31,12 +29,10 @@ public interface QueueManager {
     /**
      * Read messages from queue
      * @param limit
-     * @param transactionTimeout timeout in seconds
-     * @param waitTime wait time for next message in milliseconds
      * @param klass class to cast the return from
      * @return List of Queue Messages
      */
-    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
+    List<QueueMessage> getMessages(int limit, Class klass);
 
     /**
      * get the queue depth

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index f1d8c5a..4dd9bda 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -169,7 +169,8 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         this.clientConfiguration = new ClientConfiguration()
             .withConnectionTimeout(queueFig.getQueueClientConnectionTimeout())
-            .withSocketTimeout(queueFig.getQueueClientSocketTimeout())
+            // don't let the socket timeout be configured less than 5 sec (network delays do happen)
+            .withSocketTimeout(Math.max(5000, queueFig.getQueueClientSocketTimeout()))
             .withGzip(true);
 
         try {
@@ -411,8 +412,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     @Override
-    public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
-                                                    final Class klass ) {
+    public List<QueueMessage> getMessages(final int limit, final Class klass) {
 
         if ( sqs == null ) {
             logger.error( "SQS is null - was not initialized properly" );
@@ -427,8 +427,10 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
         receiveMessageRequest.setMaxNumberOfMessages( limit );
-        receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) );
-        receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 );
+        receiveMessageRequest.setVisibilityTimeout( Math.max( 1, fig.getVisibilityTimeout() / 1000 ) );
+
+        // set SQS long polling to 3 secs < the client socket timeout (network delays) with min of 0 (no long poll)
+        receiveMessageRequest.setWaitTimeSeconds( Math.max(0, ( fig.getQueueClientSocketTimeout() - 3000) / 1000 ) );
 
         try {
             ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index c8661c0..d57beab 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -82,14 +82,14 @@ public class QueueManagerTest {
     public void send() throws Exception{
         String value = "bodytest";
         qm.sendMessage(value);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
+        List<QueueMessage> messageList = qm.getMessages(1, String.class);
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
 
-        messageList = qm.getMessages(1,5000,5000,String.class);
+        messageList = qm.getMessages(1, String.class);
         assertTrue(messageList.size() <= 0);
 
     }
@@ -102,14 +102,14 @@ public class QueueManagerTest {
         List<Map<String,String>> bodies = new ArrayList<>();
         bodies.add(values);
         qm.sendMessages(bodies);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass());
+        List<QueueMessage> messageList = qm.getMessages(1, values.getClass());
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(values));
         }
         qm.commitMessages(messageList);
 
-        messageList = qm.getMessages(1,5000,5000,values.getClass());
+        messageList = qm.getMessages(1, values.getClass());
         assertTrue(messageList.size() <= 0);
 
     }
@@ -133,7 +133,7 @@ public class QueueManagerTest {
         }
         assertTrue(depth>0);
 
-        List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass());
+        List<QueueMessage> messageList = qm.getMessages(10, values.getClass());
         assertTrue(messageList.size() <= 500);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(values));

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
index cbfe97d..91d95d2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
+++ b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
@@ -19,6 +19,9 @@ package org.apache.usergrid.security.shiro.credentials;
 
 public class ApplicationAccessToken extends AbstractAccessTokenCredentials implements ApplicationCredentials {
 
+    // Do not remove, needed for Jackson to handle deserialization
+    public ApplicationAccessToken(){}
+
     public ApplicationAccessToken( String token ) {
         super( token );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
index 3ae6f2c..36e78c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
+++ b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
@@ -19,6 +19,9 @@ package org.apache.usergrid.security.shiro.credentials;
 
 public class OrganizationAccessToken extends AbstractAccessTokenCredentials implements OrganizationCredentials {
 
+    // Do not remove, needed for Jackson to handle deserialization
+    public OrganizationAccessToken(){}
+
     public OrganizationAccessToken( String token ) {
         super( token );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 0a0e982..de9cf06 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -169,7 +169,7 @@ public class QueueListener  {
         while ( true ) {
 
                 Timer.Context timerContext = timer.time();
-                rx.Observable.from(queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class))
+                rx.Observable.from(queueManager.getMessages(getBatchSize(), ApplicationQueueMessage.class))
                     .buffer(getBatchSize())
                     .doOnNext(messages -> {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index 272bb65..f3c65c7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -28,7 +28,6 @@ import java.util.List;
 
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueMessage;
-import rx.Observable;
 
 
 /**
@@ -37,8 +36,7 @@ import rx.Observable;
 public class ImportQueueManager implements QueueManager {
 
     @Override
-    public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
-                                           final Class klass ) {
+    public List<QueueMessage> getMessages(final int limit, final Class klass) {
         return new ArrayList<>();
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 7ceb2ae..5895d38 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -181,7 +181,7 @@ public abstract class QueueListener  {
                 Timer.Context timerContext = timer.time();
                 //Get the messages out of the queue.
                 //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
-                rx.Observable.from( queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class))
+                rx.Observable.from( queueManager.getMessages(getBatchSize(), ImportQueueMessage.class))
                     .buffer(getBatchSize())
                     .doOnNext(messages -> {
                         try {