You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2016/04/08 22:21:11 UTC

[24/36] usergrid git commit: Merge branch 'release-2.1.1' of https://git-wip-us.apache.org/repos/asf/usergrid into selectiveIndexingCherryPicked

Merge branch 'release-2.1.1' of https://git-wip-us.apache.org/repos/asf/usergrid into selectiveIndexingCherryPicked

# By Michael Russo
# Via Michael Russo
* 'release-2.1.1' of https://git-wip-us.apache.org/repos/asf/usergrid:
  Add concurrency to re-index and fix de-serialization problems with Shiro cache, plus comments.
  Fix variable using within Observable path.
  Change scoped cache logging to debug.
  Fix StaleIndexCleanup test to match the current design for removing stale,  un-updated entities.
  Clean up older versions of entities from the index after an entity update.
  Fix spelling of bootstrap.
  Organize the /system/database/setup and /system/database/bootstrap flows.
  USERGRID-1192 - default to use TRANSPORT client for Elasticsearch SDK.
  Set connect timeout in Astyanax driver.
  Add connect timeout to Asytanax driver (set same as socket timeout for now).
  Add caching around getOrganizationConfigForApplication.
  Ignore new test that's not completely accurate yet.
  Fix issue with cursor paging when using the smart shard iterator.
  Fix graph cursor paging with new shard iterator.
  Fix legacy column parser for edge shards.

Conflicts:
	stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0aa241d4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0aa241d4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0aa241d4

Branch: refs/heads/release-2.1.1
Commit: 0aa241d463c57fbae868e8abb0bf48ebc7aeb1cb
Parents: 12a3be9 f5daca5
Author: George Reyes <gr...@apache.org>
Authored: Wed Mar 30 17:07:59 2016 -0700
Committer: George Reyes <gr...@apache.org>
Committed: Wed Mar 30 17:07:59 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   7 +-
 .../corepersistence/CpEntityManagerFactory.java |  34 ++--
 .../usergrid/corepersistence/CpSetup.java       | 100 ++++------
 .../asyncevents/AsyncEventService.java          |   7 +
 .../asyncevents/AsyncEventServiceImpl.java      |  33 +++-
 .../asyncevents/EventBuilder.java               |  12 +-
 .../asyncevents/EventBuilderImpl.java           |  36 +++-
 .../asyncevents/model/AsyncEvent.java           |   5 +-
 .../model/DeIndexOldVersionsEvent.java          |  50 +++++
 .../asyncevents/model/EdgeDeleteEvent.java      |   4 +-
 .../model/ElasticsearchIndexEvent.java          |   2 +-
 .../asyncevents/model/EntityDeleteEvent.java    |   3 +
 .../model/InitializeApplicationIndexEvent.java  |   2 +-
 .../index/IndexProcessorFig.java                |  11 +-
 .../index/ReIndexServiceImpl.java               |  42 ++---
 .../read/traverse/AbstractReadGraphFilter.java  |  18 +-
 .../persistence/EntityManagerFactory.java       |   2 +-
 .../persistence/cassandra/CassandraService.java |   2 -
 .../usergrid/persistence/cassandra/Setup.java   |  28 +--
 .../corepersistence/StaleIndexCleanupTest.java  | 185 +------------------
 .../usergrid/persistence/CoreSchemaManager.java |  14 +-
 .../impl/ScopedCacheSerializationImpl.java      |   9 +-
 .../core/astyanax/CassandraClusterImpl.java     |   4 +-
 .../astyanax/MultiRowShardColumnIterator.java   |  36 +++-
 .../persistence/core/shard/SmartShard.java      |  11 +-
 .../usergrid/persistence/graph/GraphFig.java    |   8 +
 .../graph/impl/GraphManagerImpl.java            |   2 +-
 .../impl/shard/impl/EdgeSearcher.java           |  33 +++-
 .../shard/impl/EdgeShardSerializationImpl.java  |  13 +-
 .../impl/ShardedEdgeSerializationImpl.java      |  42 ++++-
 .../impl/shard/impl/ShardsColumnIterator.java   |  31 +++-
 .../shard/impl/serialize/ShardSerializer.java   |  33 +---
 .../usergrid/persistence/index/IndexFig.java    |   2 +-
 .../usergrid/rest/system/DatabaseResource.java  |   4 +-
 .../collection/paging/PagingResourceIT.java     |  70 ++++++-
 .../resources/usergrid-custom-test.properties   |   4 +
 .../cassandra/ManagementServiceImpl.java        |  67 +++++--
 .../shiro/credentials/AdminUserAccessToken.java |   3 +
 .../ApplicationClientCredentials.java           |   5 +
 .../shiro/principals/AdminUserPrincipal.java    |   3 +
 .../principals/ApplicationGuestPrincipal.java   |   4 +-
 .../shiro/principals/ApplicationPrincipal.java  |   4 +-
 .../principals/ApplicationUserPrincipal.java    |   3 +
 .../shiro/principals/OrganizationPrincipal.java |   4 +-
 .../usergrid/cassandra/SchemaManager.java       |   3 -
 .../usergrid/cassandra/FakeSchemaManager.java   |   8 +-
 46 files changed, 559 insertions(+), 444 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0aa241d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0aa241d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0aa241d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0aa241d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 22ec9d9,b5f7d77..1b68712
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@@ -81,9 -75,8 +81,10 @@@ public class ReIndexServiceImpl impleme
      private final AllEntityIdsObservable allEntityIdsObservable;
      private final IndexProcessorFig indexProcessorFig;
      private final MapManager mapManager;
 +    private final MapManagerFactory mapManagerFactory;
      private final AsyncEventService indexService;
      private final EntityIndexFactory entityIndexFactory;
++    private final IndexSchemaCacheFactory indexSchemaCacheFactory;
  
  
      @Inject
@@@ -93,6 -86,6 +94,7 @@@
                                 final MapManagerFactory mapManagerFactory,
                                 final AllApplicationsObservable allApplicationsObservable,
                                 final IndexProcessorFig indexProcessorFig,
++                               final IndexSchemaCacheFactory indexSchemaCacheFactory,
                                 final AsyncEventService indexService ) {
          this.entityIndexFactory = entityIndexFactory;
          this.indexLocationStrategyFactory = indexLocationStrategyFactory;
@@@ -100,7 -93,7 +102,8 @@@
          this.allApplicationsObservable = allApplicationsObservable;
          this.indexProcessorFig = indexProcessorFig;
          this.indexService = indexService;
 -
++        this.indexSchemaCacheFactory = indexSchemaCacheFactory;
 +        this.mapManagerFactory = mapManagerFactory;
          this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
      }
  
@@@ -134,47 -122,19 +137,44 @@@
  
          // create an observable that loads a batch to be indexed
  
-         Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
-             reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
-             .buffer( indexProcessorFig.getReindexBufferSize());
- 
-         //Check to see if we have a selective indexing schema, if we do then update its reindexing time, otherwise
-         //don't update or create anything.
 -        final Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
 +        if(reIndexRequestBuilder.getCollectionName().isPresent()) {
 +            String collectionName =  InflectionUtils.pluralize( CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
 +            MapManager collectionMapStorage = mapManagerFactory.createMapManager( CpNamingUtils.getEntityTypeMapScope( appId.get().getApplication()  ) );
-             String jsonSchemaMap = collectionMapStorage.getString( collectionName );
++            IndexSchemaCache indexSchemaCache = indexSchemaCacheFactory.getInstance( collectionMapStorage );
++
++            java.util.Optional<String> collectionIndexingSchema =  indexSchemaCache.getCollectionSchema( collectionName );
 +
 +
++            String jsonSchemaMap =  null;
++
++            if(collectionIndexingSchema.isPresent()){
++                jsonSchemaMap = collectionIndexingSchema.get();
++            }
++
 +            //If we do have a schema then parse it and add it to a list of properties we want to keep.Otherwise return.
 +            if ( jsonSchemaMap != null ) {
 +
 +                Map jsonMapData = ( Map ) JsonUtils.parse( jsonSchemaMap );
 +
 +                jsonMapData.put( "lastReindexed", Instant.now().toEpochMilli() );
 +                collectionMapStorage.putString( collectionName,JsonUtils.mapToJsonString(jsonMapData )  );
 +            }
 +
 +        }
 +
-         if(delayTimer.isPresent()){
- 
-             if(timeUnitOptional.isPresent()){
-                 runningReIndex = runningReIndex.delay( delayTimer.get(),timeUnitOptional.get() );
-             }
-             else{
-                 runningReIndex = runningReIndex.delay( delayTimer.get(), TimeUnit.MILLISECONDS );
-             }
- 
-         }
++        Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+             reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
 +
-         runningReIndex = runningReIndex.doOnNext(edges -> {
-                 logger.info("Sending batch of {} to be indexed.", edges.size());
-                 indexService.indexBatch(edges, modifiedSince);
+             .buffer( indexProcessorFig.getReindexBufferSize())
+             .flatMap( edgeScopes -> Observable.just(edgeScopes)
+                 .doOnNext(edges -> {
  
-         });
+                     logger.info("Sending batch of {} to be indexed.", edges.size());
+                     indexService.indexBatch(edges, modifiedSince);
+                 })
+                 .subscribeOn( Schedulers.io() ), indexProcessorFig.getReindexConcurrencyFactor());
  
  
 -
          //start our sampler and state persistence
          //take a sample every sample interval to allow us to resume state with minimal loss
          //create our flushing collector and flush the edge scopes to it