You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/01 03:14:33 UTC
usergrid git commit: Fix serialization issues with access tokens and
simplify the Queue Manager getMessages interface.
Repository: usergrid
Updated Branches:
refs/heads/release-2.1.1 bfd2bb34a -> bcc978088
Fix serialization issues with access tokens and simplify the Queue Manager getMessages interface.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bcc97808
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bcc97808
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bcc97808
Branch: refs/heads/release-2.1.1
Commit: bcc97808864c2bb9de9d39b5243592b818e90d5b
Parents: bfd2bb3
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Feb 29 18:14:27 2016 -0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Feb 29 18:14:27 2016 -0800
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 7 ++-----
.../usergrid/persistence/queue/LocalQueueManager.java | 6 +-----
.../org/apache/usergrid/persistence/queue/QueueFig.java | 4 ++--
.../apache/usergrid/persistence/queue/QueueManager.java | 6 +-----
.../persistence/queue/impl/SNSQueueManagerImpl.java | 12 +++++++-----
.../usergrid/persistence/queue/QueueManagerTest.java | 10 +++++-----
.../shiro/credentials/ApplicationAccessToken.java | 3 +++
.../shiro/credentials/OrganizationAccessToken.java | 3 +++
.../usergrid/services/notifications/QueueListener.java | 2 +-
.../usergrid/services/queues/ImportQueueManager.java | 4 +---
.../apache/usergrid/services/queues/QueueListener.java | 2 +-
11 files changed, 27 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 782bed7..3f623d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -242,13 +242,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final Timer.Context timer = this.readTimer.time();
try {
- return queue.getMessages(MAX_TAKE,
- indexProcessorFig.getIndexQueueVisibilityTimeout(),
- Math.max(1000, queueFig.getQueueClientSocketTimeout() - 1000), // 1 sec less than socket timeout
- AsyncEvent.class);
+ return queue.getMessages(MAX_TAKE, AsyncEvent.class);
}
- //stop our timer
finally {
+ //stop our timer
timer.stop();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 4d26100..1f4261a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -22,17 +22,13 @@ package org.apache.usergrid.persistence.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Observable;
import java.io.IOException;
-import java.util.AbstractQueue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
@@ -45,7 +41,7 @@ public class LocalQueueManager implements QueueManager {
public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
@Override
- public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+ public List<QueueMessage> getMessages(int limit, Class klass) {
List<QueueMessage> returnQueue = new ArrayList<>();
try {
QueueMessage message=null;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 7757d58..63d2d80 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -88,11 +88,11 @@ public interface QueueFig extends GuicyFig {
int getLocalQuorumTimeout();
@Key( "usergrid.queue.client.connection.timeout")
- @Default( "1000" ) // 3 seconds
+ @Default( "1000" ) // 1 second
int getQueueClientConnectionTimeout();
@Key( "usergrid.queue.client.socket.timeout")
- @Default( "3000" ) // 3 seconds
+ @Default( "10000" ) // 10 seconds
int getQueueClientSocketTimeout();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 4c948e3..18909e4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -17,8 +17,6 @@
*/
package org.apache.usergrid.persistence.queue;
-import rx.Observable;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
@@ -31,12 +29,10 @@ public interface QueueManager {
/**
* Read messages from queue
* @param limit
- * @param transactionTimeout timeout in seconds
- * @param waitTime wait time for next message in milliseconds
* @param klass class to cast the return from
* @return List of Queue Messages
*/
- List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
+ List<QueueMessage> getMessages(int limit, Class klass);
/**
* get the queue depth
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index f1d8c5a..4dd9bda 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -169,7 +169,8 @@ public class SNSQueueManagerImpl implements QueueManager {
this.clientConfiguration = new ClientConfiguration()
.withConnectionTimeout(queueFig.getQueueClientConnectionTimeout())
- .withSocketTimeout(queueFig.getQueueClientSocketTimeout())
+ // don't let the socket timeout be configured less than 5 sec (network delays do happen)
+ .withSocketTimeout(Math.max(5000, queueFig.getQueueClientSocketTimeout()))
.withGzip(true);
try {
@@ -411,8 +412,7 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
- public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
- final Class klass ) {
+ public List<QueueMessage> getMessages(final int limit, final Class klass) {
if ( sqs == null ) {
logger.error( "SQS is null - was not initialized properly" );
@@ -427,8 +427,10 @@ public class SNSQueueManagerImpl implements QueueManager {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
receiveMessageRequest.setMaxNumberOfMessages( limit );
- receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) );
- receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 );
+ receiveMessageRequest.setVisibilityTimeout( Math.max( 1, fig.getVisibilityTimeout() / 1000 ) );
+
+ // set SQS long polling to 3 secs < the client socket timeout (network delays) with min of 0 (no long poll)
+ receiveMessageRequest.setWaitTimeSeconds( Math.max(0, ( fig.getQueueClientSocketTimeout() - 3000) / 1000 ) );
try {
ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index c8661c0..d57beab 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -82,14 +82,14 @@ public class QueueManagerTest {
public void send() throws Exception{
String value = "bodytest";
qm.sendMessage(value);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
+ List<QueueMessage> messageList = qm.getMessages(1, String.class);
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
- messageList = qm.getMessages(1,5000,5000,String.class);
+ messageList = qm.getMessages(1, String.class);
assertTrue(messageList.size() <= 0);
}
@@ -102,14 +102,14 @@ public class QueueManagerTest {
List<Map<String,String>> bodies = new ArrayList<>();
bodies.add(values);
qm.sendMessages(bodies);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass());
+ List<QueueMessage> messageList = qm.getMessages(1, values.getClass());
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(values));
}
qm.commitMessages(messageList);
- messageList = qm.getMessages(1,5000,5000,values.getClass());
+ messageList = qm.getMessages(1, values.getClass());
assertTrue(messageList.size() <= 0);
}
@@ -133,7 +133,7 @@ public class QueueManagerTest {
}
assertTrue(depth>0);
- List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass());
+ List<QueueMessage> messageList = qm.getMessages(10, values.getClass());
assertTrue(messageList.size() <= 500);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(values));
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
index cbfe97d..91d95d2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
+++ b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java
@@ -19,6 +19,9 @@ package org.apache.usergrid.security.shiro.credentials;
public class ApplicationAccessToken extends AbstractAccessTokenCredentials implements ApplicationCredentials {
+ // Do not remove, needed for Jackson to handle deserialization
+ public ApplicationAccessToken(){}
+
public ApplicationAccessToken( String token ) {
super( token );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
index 3ae6f2c..36e78c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
+++ b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java
@@ -19,6 +19,9 @@ package org.apache.usergrid.security.shiro.credentials;
public class OrganizationAccessToken extends AbstractAccessTokenCredentials implements OrganizationCredentials {
+ // Do not remove, needed for Jackson to handle deserialization
+ public OrganizationAccessToken(){}
+
public OrganizationAccessToken( String token ) {
super( token );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 0a0e982..de9cf06 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -169,7 +169,7 @@ public class QueueListener {
while ( true ) {
Timer.Context timerContext = timer.time();
- rx.Observable.from(queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class))
+ rx.Observable.from(queueManager.getMessages(getBatchSize(), ApplicationQueueMessage.class))
.buffer(getBatchSize())
.doOnNext(messages -> {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index 272bb65..f3c65c7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -28,7 +28,6 @@ import java.util.List;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueMessage;
-import rx.Observable;
/**
@@ -37,8 +36,7 @@ import rx.Observable;
public class ImportQueueManager implements QueueManager {
@Override
- public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
- final Class klass ) {
+ public List<QueueMessage> getMessages(final int limit, final Class klass) {
return new ArrayList<>();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 7ceb2ae..5895d38 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -181,7 +181,7 @@ public abstract class QueueListener {
Timer.Context timerContext = timer.time();
//Get the messages out of the queue.
//TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
- rx.Observable.from( queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class))
+ rx.Observable.from( queueManager.getMessages(getBatchSize(), ImportQueueMessage.class))
.buffer(getBatchSize())
.doOnNext(messages -> {
try {