You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/31 15:46:32 UTC
[37/37] usergrid git commit: Merge branch 'master' into
usergrid-1318-queue
Merge branch 'master' into usergrid-1318-queue
Conflicts:
content/releases/index.html
stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
website/content/releases/index.html
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7cc5c1c0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7cc5c1c0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7cc5c1c0
Branch: refs/heads/usergrid-1318-queue
Commit: 7cc5c1c0701649bfdf31d695c10856f87510f181
Parents: f8c3a2d fe3bf56
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 31 11:45:26 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 31 11:45:26 2016 -0400
----------------------------------------------------------------------
.../docs/_sources/data-storage/collections.txt | 144 +
.../_sources/installation/deployment-guide.txt | 125 +-
.../docs/_sources/orgs-and-apps/application.txt | 104 +
content/docs/data-storage/collections.html | 209 +-
content/docs/index.html | 3 +
content/docs/installation/deployment-guide.html | 16 +-
content/docs/orgs-and-apps/application.html | 141 +
content/docs/searchindex.js | 2 +-
content/releases/index.html | 5 +-
docs/installation/deployment-guide.md | 125 +-
.../asyncevents/AsyncEventService.java | 3 +-
.../asyncevents/AsyncEventServiceImpl.java | 189 +-
.../index/IndexProcessorFig.java | 9 +
.../corepersistence/index/ReIndexAction.java | 5 +-
.../index/ReIndexServiceImpl.java | 4 +-
.../read/traverse/AbstractReadGraphFilter.java | 2 +-
.../AbstractReadReverseGraphFilter.java | 2 +-
.../exceptions/AbstractExceptionMapper.java | 2 +-
.../PasswordPolicyViolationExceptionMapper.java | 48 +
.../UserResource/bad_confirmation_token.jsp | 33 +
.../collection/users/PermissionsResourceIT.java | 4 +-
.../collection/users/UserResourceIT.java | 38 +-
.../usergrid/rest/management/AdminUsersIT.java | 51 +
.../rest/management/ExternalSSOEnabledIT.java | 2 +-
.../rest/management/ManagementResourceIT.java | 4 +-
.../rest/management/RegistrationIT.java | 6 +-
.../cassandra/ManagementServiceImpl.java | 77 +-
.../usergrid/security/PasswordPolicy.java | 53 +
.../usergrid/security/PasswordPolicyFig.java | 79 +
.../usergrid/security/PasswordPolicyImpl.java | 156 +
.../security/sso/ApigeeSSO2Provider.java | 111 +-
.../PasswordPolicyViolationException.java | 46 +
.../services/guice/ServiceModuleImpl.java | 8 +
.../usergrid/security/ApigeeSSO2ProviderIT.java | 297 +
.../usergrid/security/PasswordPolicyTest.java | 47 +
.../security/PasswordPolicyTestFig.java | 161 +
.../usergrid/tools/RemoveAdminUserFromOrg.java | 230 +
website/content/releases/index.html | 6 +-
website/tmp/checksums | 3 +
website/tmp/compiled_content | 14130 +++++++++++++++++
website/tmp/dependencies | 10 +
website/tmp/rule_memory | Bin 0 -> 13045 bytes
42 files changed, 16355 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index f941c11,dba4edf..9f931d3
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@@ -65,18 -78,11 +65,20 @@@ import rx.Subscriber
import rx.Subscription;
import rx.schedulers.Schedulers;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+ import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
/**
- * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner.
+ * TODO, this whole class is becoming a nightmare.
+ * We need to remove all consume from this class and refactor it into the following manner.
*
* 1. Produce. Keep the code in the handle as is
* 2. Consume: Move the code into a refactored system
@@@ -99,10 -105,13 +101,13 @@@ public class AsyncEventServiceImpl impl
// SQS maximum receive messages is 10
public int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+ public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+
- private final QueueManager queue;
- private final QueueManager utilityQueue;
+ private final LegacyQueueManager queue;
++ private final LegacyQueueManager utilityQueue;
private final IndexProcessorFig indexProcessorFig;
- private final QueueFig queueFig;
+ private final LegacyQueueFig queueFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@@ -155,8 -165,10 +161,14 @@@
this.rxTaskScheduler = rxTaskScheduler;
- LegacyQueueScope queueScope = new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
- QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
- QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, QueueScope.RegionImplementation.ALL);
++ LegacyQueueScope queueScope =
++ new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
++
++ LegacyQueueScope utilityQueueScope =
++ new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
++
this.queue = queueManagerFactory.getQueueManager(queueScope);
+ this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
@@@ -225,11 -245,20 +245,20 @@@
}
}
+ private void offerBatchToUtilityQueue(final List operations){
+ try {
+ //signal to SQS
+ this.utilityQueue.sendMessages(operations);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to queue message", e);
+ }
+ }
+
/**
- * Take message from SQS
+ * Take message
*/
- private List<QueueMessage> take() {
+ private List<LegacyQueueMessage> take() {
final Timer.Context timer = this.readTimer.time();
@@@ -242,38 -271,59 +271,67 @@@
}
}
+ /**
+ * Take message from SQS utility queue
+ */
- private List<QueueMessage> takeFromUtilityQueue() {
++ private List<LegacyQueueMessage> takeFromUtilityQueue() {
+
+ final Timer.Context timer = this.readTimer.time();
+
+ try {
+ return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+ }
+ finally {
+ //stop our timer
+ timer.stop();
+ }
+ }
+
-
/**
- * Ack message in SQS
+ * Ack message
*/
- public void ack(final List<QueueMessage> messages) {
+ public void ack(final List<LegacyQueueMessage> messages) {
final Timer.Context timer = this.ackTimer.time();
- try{
- queue.commitMessages( messages );
+ try {
- //decrement our in-flight counter
- inFlight.decrementAndGet();
+ for ( LegacyQueueMessage legacyQueueMessage : messages ) {
+ try {
+ queue.commitMessage( legacyQueueMessage );
+ inFlight.decrementAndGet();
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
- }finally {
- timer.stop();
- }
+ } catch ( Throwable t ) {
+ logger.error("Continuing after error acking message: " + legacyQueueMessage.getMessageId() );
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException( "Unable to ack messages", e );
+ } finally {
+ timer.stop();
+ }
}
+
/**
+ * calls the event handlers and returns a result with information on whether
+ * it needs to be ack'd and whether it needs to be indexed
+ * Ack message in SQS
+ */
- public void ackUtilityQueue(final List<QueueMessage> messages) {
++ public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
+ try{
+ utilityQueue.commitMessages( messages );
+ }catch(Exception e){
+ throw new RuntimeException("Unable to ack messages", e);
+ }
+ }
+
+ /**
- * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
++ * calls the event handlers and returns a result with information on whether
++ * it needs to be ack'd and whether it needs to be indexed
* @param messages
* @return
*/
@@@ -553,11 -593,10 +610,11 @@@
//send to the topic so all regions index the batch
- offerTopic( elasticsearchIndexEvent );
+ offerTopic( elasticsearchIndexEvent, forUtilityQueue );
}
- private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {
+ private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
+ throws IndexDocNotFoundException {
Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
@@@ -755,18 -794,22 +817,23 @@@
}
- private void startWorker() {
+ private void startWorker(final String type) {
+ Preconditions.checkNotNull(type, "Worker type required");
synchronized (mutex) {
+ boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+
- Observable<List<QueueMessage>> consumer =
- Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
+ Observable<List<LegacyQueueMessage>> consumer =
+ Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@Override
- public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
+ public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
- long threadNum = isUtilityQueue ? counterUtility.incrementAndGet() : counter.incrementAndGet();
++ long threadNum = isUtilityQueue ?
++ counterUtility.incrementAndGet() : counter.incrementAndGet();
+ Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
- List<QueueMessage> drainList = null;
+ List<LegacyQueueMessage> drainList = null;
do {
try {
@@@ -802,42 -850,45 +874,48 @@@
} ) //this won't block our read loop, just reads and proceeds
.flatMap( sqsMessages -> {
- //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+ //do this on a different schedule, and introduce concurrency
+ // with flatmap for faster processing
return Observable.just( sqsMessages )
- .map( messages -> {
- if ( messages == null || messages.size() == 0 ) {
- // no messages came from the queue, move on
- return null;
- }
-
- try {
- // process the messages
- List<IndexEventResult> indexEventResults = callEventHandlers( messages );
-
- // submit the processed messages to index producer
- List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults );
-
- if ( messagesToAck.size() < messages.size() ) {
- logger.warn( "Missing {} message(s) from index processing",
- messages.size() - messagesToAck.size() );
- }
-
- // ack each message if making it to this point
- if( messagesToAck.size() > 0 ){
- ack( messagesToAck );
- }
-
- return messagesToAck;
- }
- catch ( Exception e ) {
- logger.error( "Failed to ack messages", e );
- return null;
- //do not rethrow so we can process all of them
- }
- } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
-
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
+ // no messages came from the queue, move on
+ return null;
+ }
+
+ try {
+ // process the messages
- List<IndexEventResult> indexEventResults = callEventHandlers( messages );
++ List<IndexEventResult> indexEventResults =
++ callEventHandlers( messages );
+
+ // submit the processed messages to index producer
- List<QueueMessage> messagesToAck = submitToIndex( indexEventResults, isUtilityQueue );
++ List<LegacyQueueMessage> messagesToAck =
++ submitToIndex( indexEventResults, isUtilityQueue );
+
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.warn( "Missing {} message(s) from index processing",
+ messages.size() - messagesToAck.size() );
+ }
+
+ // ack each message if making it to this point
+ if( messagesToAck.size() > 0 ){
+
+ if ( isUtilityQueue ){
+ ackUtilityQueue( messagesToAck );
+ }else{
+ ack( messagesToAck );
+ }
+ }
+
+ return messagesToAck;
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to ack messages", e );
+ return null;
+ //do not rethrow so we can process all of them
+ }
+ } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
//end flatMap
}, indexProcessorFig.getEventConcurrencyFactor() );
@@@ -853,7 -904,7 +931,7 @@@
* Submit results to index and return the queue messages to be ack'd
*
*/
- private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
- private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
++ private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
// if nothing came back then return empty list
if(indexEventResults==null){
@@@ -890,17 -941,16 +968,18 @@@
EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, id, updatedSince);
- queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
+ queueIndexOperationMessage(
- eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
++ eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
}
- public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+ public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
final List<EntityIndexEvent> batch = new ArrayList<>();
edges.forEach(e -> {
//change to id scope to avoid serialization issues
-- batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
++ batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(),
++ new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
});
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/website/content/releases/index.html
----------------------------------------------------------------------
diff --cc website/content/releases/index.html
index 24fbe66,28fb6d7..7e54686
--- a/website/content/releases/index.html
+++ b/website/content/releases/index.html
@@@ -31,10 -31,7 +31,8 @@@
Project releases are approved by vote of the Apache Usergrid Project Management Committee (PMC). Support for a release is provided by project volunteers on the project <a href="http://usergrid.apache.org/community/#mailing-lists">mailing lists</a>. Bugs found in a release may be discussed on the list and reported through the <a href="https://issues.apache.org/jira/browse/USERGRID">issue tracker</a>. The user mailing list and issue tracker are the only support options hosted by the Apache Usergrid project.
</p>
<p>
- Note: When downloading from a mirror, please be sure to verify that checksums and signatures are correct. To do so, use the checksum and signature files from the main Apache site at <a href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/">https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/</a>. Find here the KEYS file, which contains all OpenPGP keys we use to sign releases here: <a href="https://www.apache.org/dist/usergrid/KEYS">https://www.apache.org/dist/usergrid/KEYS</a>
- </p>
- <p>
- The PGP signatures can be verified using PGP or GPG. First download the <a href="https://www.apache.org/dist/usergrid/KEYS">KEYS</a> as well as the <a href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/">asc signature</a> file for the particular distribution. Then verify the signatures using:
++
+ Note: When downloading from a mirror, please be sure to verify that checksums and signatures are correct. To do so, use the checksum and signature files from the main Apache site at <a href="https://www.apache.org/dist/usergrid/usergrid-2/v2.1.0/">https://dist.apache.org/repos/dist/release/usergrid/usergrid-2/v2.1.0/</a>. Find here the KEYS file, which contains all OpenPGP keys we use to sign releases here: <a href="https://www.apache.org/dist/usergrid/KEYS">https://www.apache.org/dist/usergrid/KEYS</a>
</p>
<p>
% pgpk -a KEYS