You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/31 15:46:32 UTC

[37/37] usergrid git commit: Merge branch 'master' into usergrid-1318-queue

Merge branch 'master' into usergrid-1318-queue

Conflicts:
	content/releases/index.html
	stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
	website/content/releases/index.html


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7cc5c1c0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7cc5c1c0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7cc5c1c0

Branch: refs/heads/usergrid-1318-queue
Commit: 7cc5c1c0701649bfdf31d695c10856f87510f181
Parents: f8c3a2d fe3bf56
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 31 11:45:26 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 31 11:45:26 2016 -0400

----------------------------------------------------------------------
 .../docs/_sources/data-storage/collections.txt  |   144 +
 .../_sources/installation/deployment-guide.txt  |   125 +-
 .../docs/_sources/orgs-and-apps/application.txt |   104 +
 content/docs/data-storage/collections.html      |   209 +-
 content/docs/index.html                         |     3 +
 content/docs/installation/deployment-guide.html |    16 +-
 content/docs/orgs-and-apps/application.html     |   141 +
 content/docs/searchindex.js                     |     2 +-
 content/releases/index.html                     |     5 +-
 docs/installation/deployment-guide.md           |   125 +-
 .../asyncevents/AsyncEventService.java          |     3 +-
 .../asyncevents/AsyncEventServiceImpl.java      |   189 +-
 .../index/IndexProcessorFig.java                |     9 +
 .../corepersistence/index/ReIndexAction.java    |     5 +-
 .../index/ReIndexServiceImpl.java               |     4 +-
 .../read/traverse/AbstractReadGraphFilter.java  |     2 +-
 .../AbstractReadReverseGraphFilter.java         |     2 +-
 .../exceptions/AbstractExceptionMapper.java     |     2 +-
 .../PasswordPolicyViolationExceptionMapper.java |    48 +
 .../UserResource/bad_confirmation_token.jsp     |    33 +
 .../collection/users/PermissionsResourceIT.java |     4 +-
 .../collection/users/UserResourceIT.java        |    38 +-
 .../usergrid/rest/management/AdminUsersIT.java  |    51 +
 .../rest/management/ExternalSSOEnabledIT.java   |     2 +-
 .../rest/management/ManagementResourceIT.java   |     4 +-
 .../rest/management/RegistrationIT.java         |     6 +-
 .../cassandra/ManagementServiceImpl.java        |    77 +-
 .../usergrid/security/PasswordPolicy.java       |    53 +
 .../usergrid/security/PasswordPolicyFig.java    |    79 +
 .../usergrid/security/PasswordPolicyImpl.java   |   156 +
 .../security/sso/ApigeeSSO2Provider.java        |   111 +-
 .../PasswordPolicyViolationException.java       |    46 +
 .../services/guice/ServiceModuleImpl.java       |     8 +
 .../usergrid/security/ApigeeSSO2ProviderIT.java |   297 +
 .../usergrid/security/PasswordPolicyTest.java   |    47 +
 .../security/PasswordPolicyTestFig.java         |   161 +
 .../usergrid/tools/RemoveAdminUserFromOrg.java  |   230 +
 website/content/releases/index.html             |     6 +-
 website/tmp/checksums                           |     3 +
 website/tmp/compiled_content                    | 14130 +++++++++++++++++
 website/tmp/dependencies                        |    10 +
 website/tmp/rule_memory                         |   Bin 0 -> 13045 bytes
 42 files changed, 16355 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index f941c11,dba4edf..9f931d3
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@@ -65,18 -78,11 +65,20 @@@ import rx.Subscriber
  import rx.Subscription;
  import rx.schedulers.Schedulers;
  
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.*;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
+ import static org.apache.commons.lang.StringUtils.isNotEmpty;
+ 
  
  /**
 - * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
 + * TODO, this whole class is becoming a nightmare.
 + * We need to remove all consume from this class and refactor it into the following manner.
   *
   * 1. Produce.  Keep the code in the handle as is
   * 2. Consume:  Move the code into a refactored system
@@@ -99,10 -105,13 +101,13 @@@ public class AsyncEventServiceImpl impl
      // SQS maximum receive messages is 10
      public int MAX_TAKE = 10;
      public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+ 
  
 -    private final QueueManager queue;
 -    private final QueueManager utilityQueue;
 +    private final LegacyQueueManager queue;
++    private final LegacyQueueManager utilityQueue;
      private final IndexProcessorFig indexProcessorFig;
 -    private final QueueFig queueFig;
 +    private final LegacyQueueFig queueFig;
      private final IndexProducer indexProducer;
      private final EntityCollectionManagerFactory entityCollectionManagerFactory;
      private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@@ -155,8 -165,10 +161,14 @@@
  
          this.rxTaskScheduler = rxTaskScheduler;
  
-         LegacyQueueScope queueScope = new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
 -        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
 -        QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, QueueScope.RegionImplementation.ALL);
++        LegacyQueueScope queueScope =
++            new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
++
++        LegacyQueueScope utilityQueueScope =
++            new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
++
          this.queue = queueManagerFactory.getQueueManager(queueScope);
+         this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
  
          this.indexProcessorFig = indexProcessorFig;
          this.queueFig = queueFig;
@@@ -225,11 -245,20 +245,20 @@@
          }
      }
  
+     private void offerBatchToUtilityQueue(final List operations){
+         try {
+             //signal to SQS
+             this.utilityQueue.sendMessages(operations);
+         } catch (IOException e) {
+             throw new RuntimeException("Unable to queue message", e);
+         }
+     }
+ 
  
      /**
 -     * Take message from SQS
 +     * Take message
       */
 -    private List<QueueMessage> take() {
 +    private List<LegacyQueueMessage> take() {
  
          final Timer.Context timer = this.readTimer.time();
  
@@@ -242,38 -271,59 +271,67 @@@
          }
      }
  
+     /**
+      * Take message from SQS utility queue
+      */
 -    private List<QueueMessage> takeFromUtilityQueue() {
++    private List<LegacyQueueMessage> takeFromUtilityQueue() {
+ 
+         final Timer.Context timer = this.readTimer.time();
+ 
+         try {
+             return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+         }
+         finally {
+             //stop our timer
+             timer.stop();
+         }
+     }
+ 
  
 -
      /**
 -     * Ack message in SQS
 +     * Ack message
       */
 -    public void ack(final List<QueueMessage> messages) {
 +    public void ack(final List<LegacyQueueMessage> messages) {
  
          final Timer.Context timer = this.ackTimer.time();
  
 -        try{
 -            queue.commitMessages( messages );
 +        try {
  
 -            //decrement our in-flight counter
 -            inFlight.decrementAndGet();
 +            for ( LegacyQueueMessage legacyQueueMessage : messages ) {
 +                try {
 +                    queue.commitMessage( legacyQueueMessage );
 +                    inFlight.decrementAndGet();
  
 -        }catch(Exception e){
 -            throw new RuntimeException("Unable to ack messages", e);
 -        }finally {
 -            timer.stop();
 -        }
 +                } catch ( Throwable t ) {
 +                    logger.error("Continuing after error acking message: " + legacyQueueMessage.getMessageId() );
 +                }
 +            }
  
 +        } catch (Exception e) {
 +            throw new RuntimeException( "Unable to ack messages", e );
  
 +        } finally {
 +            timer.stop();
 +        }
      }
  
 +
      /**
 +     * calls the event handlers and returns a result with information on whether
 +     * it needs to be ack'd and whether it needs to be indexed
+      * Ack message in SQS
+      */
 -    public void ackUtilityQueue(final List<QueueMessage> messages) {
++    public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
+         try{
+             utilityQueue.commitMessages( messages );
+         }catch(Exception e){
+             throw new RuntimeException("Unable to ack messages", e);
+         }
+     }
+ 
+     /**
 -     * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
++     * calls the event handlers and returns a result with information on whether
++     * it needs to be ack'd and whether it needs to be indexed
       * @param messages
       * @return
       */
@@@ -553,11 -593,10 +610,11 @@@
  
          //send to the topic so all regions index the batch
  
-         offerTopic( elasticsearchIndexEvent );
+         offerTopic( elasticsearchIndexEvent, forUtilityQueue );
      }
  
 -    private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {
 +    private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
 +        throws IndexDocNotFoundException {
  
          Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
  
@@@ -755,18 -794,22 +817,23 @@@
      }
  
  
-     private void startWorker() {
+     private void startWorker(final String type) {
+         Preconditions.checkNotNull(type, "Worker type required");
          synchronized (mutex) {
  
+             boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+ 
 -            Observable<List<QueueMessage>> consumer =
 -                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
 +            Observable<List<LegacyQueueMessage>> consumer =
 +                    Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
                          @Override
 -                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
 +                        public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
  
                              //name our thread so it's easy to see
-                             Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 -                            long threadNum = isUtilityQueue ? counterUtility.incrementAndGet() : counter.incrementAndGet();
++                            long threadNum = isUtilityQueue ?
++                                counterUtility.incrementAndGet() : counter.incrementAndGet();
+                             Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
  
 -                                List<QueueMessage> drainList = null;
 +                            List<LegacyQueueMessage> drainList = null;
  
                              do {
                                  try {
@@@ -802,42 -850,45 +874,48 @@@
                      } )        //this won't block our read loop, just reads and proceeds
                          .flatMap( sqsMessages -> {
  
 -                            //do this on a different schedule, and introduce concurrency with flatmap for faster processing
 +                            //do this on a different schedule, and introduce concurrency
 +                            // with flatmap for faster processing
                              return Observable.just( sqsMessages )
  
-                                  .map( messages -> {
-                                      if ( messages == null || messages.size() == 0 ) {
-                                          // no messages came from the queue, move on
-                                          return null;
-                                      }
- 
-                                      try {
-                                          // process the messages
-                                          List<IndexEventResult> indexEventResults = callEventHandlers( messages );
- 
-                                          // submit the processed messages to index producer
-                                          List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults );
- 
-                                          if ( messagesToAck.size() < messages.size() ) {
-                                              logger.warn( "Missing {} message(s) from index processing",
-                                                 messages.size() - messagesToAck.size() );
-                                          }
- 
-                                          // ack each message if making it to this point
-                                          if( messagesToAck.size() > 0 ){
-                                              ack( messagesToAck );
-                                          }
- 
-                                          return messagesToAck;
-                                      }
-                                      catch ( Exception e ) {
-                                          logger.error( "Failed to ack messages", e );
-                                          return null;
-                                          //do not rethrow so we can process all of them
-                                      }
-                                  } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
- 
+                                              .map( messages -> {
+                                                  if ( messages == null || messages.size() == 0 ) {
+                                                      // no messages came from the queue, move on
+                                                      return null;
+                                                  }
+ 
+                                                  try {
+                                                      // process the messages
 -                                                     List<IndexEventResult> indexEventResults = callEventHandlers( messages );
++                                                     List<IndexEventResult> indexEventResults =
++                                                         callEventHandlers( messages );
+ 
+                                                      // submit the processed messages to index producer
 -                                                     List<QueueMessage> messagesToAck = submitToIndex( indexEventResults, isUtilityQueue );
++                                                     List<LegacyQueueMessage> messagesToAck =
++                                                         submitToIndex( indexEventResults, isUtilityQueue );
+ 
+                                                      if ( messagesToAck.size() < messages.size() ) {
+                                                          logger.warn( "Missing {} message(s) from index processing",
+                                                             messages.size() - messagesToAck.size() );
+                                                      }
+ 
+                                                      // ack each message if making it to this point
+                                                      if( messagesToAck.size() > 0 ){
+ 
+                                                          if ( isUtilityQueue ){
+                                                              ackUtilityQueue( messagesToAck );
+                                                          }else{
+                                                              ack( messagesToAck );
+                                                          }
+                                                      }
+ 
+                                                      return messagesToAck;
+                                                  }
+                                                  catch ( Exception e ) {
+                                                      logger.error( "Failed to ack messages", e );
+                                                      return null;
+                                                      //do not rethrow so we can process all of them
+                                                  }
+                                              } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
                              //end flatMap
                          }, indexProcessorFig.getEventConcurrencyFactor() );
  
@@@ -853,7 -904,7 +931,7 @@@
       * Submit results to index and return the queue messages to be ack'd
       *
       */
-     private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
 -    private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
++    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
  
          // if nothing came back then return empty list
          if(indexEventResults==null){
@@@ -890,17 -941,16 +968,18 @@@
          EntityIndexOperation entityIndexOperation =
              new EntityIndexOperation( applicationScope, id, updatedSince);
  
 -        queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
 +        queueIndexOperationMessage(
-             eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
++            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
      }
  
-     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+     public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
  
          final List<EntityIndexEvent> batch = new ArrayList<>();
          edges.forEach(e -> {
  
              //change to id scope to avoid serialization issues
--            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
++            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(),
++                new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
  
          });
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/website/content/releases/index.html
----------------------------------------------------------------------
diff --cc website/content/releases/index.html
index 24fbe66,28fb6d7..7e54686
--- a/website/content/releases/index.html
+++ b/website/content/releases/index.html
@@@ -31,10 -31,7 +31,8 @@@
  					Project releases are approved by vote of the Apache Usergrid Project Management Committee (PMC). Support for a release is provided by project volunteers on the project <a href="http://usergrid.apache.org/community/#mailing-lists">mailing lists</a>. Bugs found in a release may be discussed on the list and reported through the <a href="https://issues.apache.org/jira/browse/USERGRID">issue tracker</a>. The user mailing list and issue tracker are the only support options hosted by the Apache Usergrid project.
  				</p>
  				<p>
- 					Note: When downloading from a mirror, please be sure to verify that checksums and signatures are correct. To do so, use the checksum and signature files from the main Apache site at <a href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/">https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/</a>. Find here the KEYS file, which contains all OpenPGP keys we use to sign releases here: <a href="https://www.apache.org/dist/usergrid/KEYS">https://www.apache.org/dist/usergrid/KEYS</a>
- 				</p>
- 				<p>
- 					The PGP signatures can be verified using PGP or GPG. First download the <a href="https://www.apache.org/dist/usergrid/KEYS">KEYS</a> as well as the <a href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/">asc signature</a> file for the particular distribution. Then verify the signatures using:
++
+ 					Note: When downloading from a mirror, please be sure to verify that checksums and signatures are correct. To do so, use the checksum and signature files from the main Apache site at <a href="https://www.apache.org/dist/usergrid/usergrid-2/v2.1.0/">https://dist.apache.org/repos/dist/release/usergrid/usergrid-2/v2.1.0/</a>. Find here the KEYS file, which contains all OpenPGP keys we use to sign releases here: <a href="https://www.apache.org/dist/usergrid/KEYS">https://www.apache.org/dist/usergrid/KEYS</a>
  				</p>
  				<p>
         				% pgpk -a KEYS