You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/12/12 15:30:13 UTC

[14/50] [abbrv] incubator-usergrid git commit: merge

merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/21630f49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/21630f49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/21630f49

Branch: refs/heads/no-source-in-es
Commit: 21630f498e7f59cd760e0fd2cd69c1f38de25f45
Parents: 8cdfaf9 a63b856
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Dec 8 17:23:52 2014 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Dec 8 17:23:52 2014 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 230 ++++++++++---------
 .../corepersistence/CpEntityManagerFactory.java |   6 +-
 .../corepersistence/util/CpEntityMapUtils.java  |  32 ++-
 .../batch/job/SchedulerRuntimeIntervalIT.java   |  49 ++--
 .../migration/EntityDataMigrationIT.java        | 139 +++++------
 .../migration/EntityTypeMappingMigrationIT.java |  89 +++----
 .../apache/usergrid/persistence/CounterIT.java  |  89 ++++---
 .../usergrid/persistence/CountingMutatorIT.java |   4 +-
 .../persistence/EntityDictionaryIT.java         |  11 +-
 .../usergrid/persistence/EntityManagerIT.java   |  47 ++--
 .../persistence/GeoQueryBooleanTest.java        |   7 +-
 .../apache/usergrid/persistence/IndexIT.java    |  14 +-
 .../usergrid/persistence/PathQueryIT.java       |  10 +-
 .../PerformanceEntityRebuildIndexTest.java      |   6 +-
 .../usergrid/persistence/PermissionsIT.java     |   5 +-
 .../cassandra/EntityManagerFactoryImplIT.java   |   9 +-
 .../query/AbstractIteratingQueryIT.java         |   3 +-
 stack/core/src/test/resources/log4j.properties  |   5 +-
 .../data/DataMigrationManagerImpl.java          |  29 ++-
 .../persistence/index/AliasedEntityIndex.java   |  55 +++++
 .../usergrid/persistence/index/EntityIndex.java |  26 ---
 .../persistence/index/IndexIdentifier.java      |   4 +
 .../index/impl/EsEntityIndexBatchImpl.java      |   6 +-
 .../index/impl/EsEntityIndexImpl.java           |  88 ++++---
 .../persistence/index/impl/EntityIndexTest.java |  15 +-
 .../persistence/queue/DefaultQueueManager.java  |  68 ++++++
 .../usergrid/services/TestQueueManager.java     |  65 ------
 .../AbstractServiceNotificationIT.java          |  16 +-
 .../apns/NotificationsServiceIT.java            |  12 +-
 .../gcm/NotificationsServiceIT.java             |  15 +-
 30 files changed, 647 insertions(+), 507 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f6b0df8,f6b0df8..56eb258
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@@ -64,6 -64,6 +64,7 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.graph.GraphManagerFactory;
  import org.apache.usergrid.persistence.graph.SearchByEdgeType;
  import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
++import org.apache.usergrid.persistence.index.AliasedEntityIndex;
  import org.apache.usergrid.persistence.index.EntityIndex;
  import org.apache.usergrid.persistence.index.EntityIndexFactory;
  import org.apache.usergrid.persistence.index.query.Query;
@@@ -702,7 -702,7 +703,10 @@@ public class CpEntityManagerFactory imp
  
      @Override
      public void addIndex(final UUID applicationId,final String indexSuffix,final int shards,final int replicas){
--        getManagerCache().getEntityIndex(CpNamingUtils.getApplicationScope( applicationId )).addIndex(indexSuffix,shards,replicas);
++        EntityIndex entityIndex = getManagerCache().getEntityIndex(CpNamingUtils.getApplicationScope(applicationId));
++        if(entityIndex instanceof AliasedEntityIndex) {
++            ((AliasedEntityIndex)entityIndex).addIndex(indexSuffix, shards, replicas);
++        }
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/AliasedEntityIndex.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/AliasedEntityIndex.java
index 0000000,0000000..15a0d45
new file mode 100644
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/AliasedEntityIndex.java
@@@ -1,0 -1,0 +1,55 @@@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  *  contributor license agreements.  The ASF licenses this file to You
++ *  * under the Apache License, Version 2.0 (the "License"); you may not
++ *  * use this file except in compliance with the License.
++ *  * You may obtain a copy of the License at
++ *  *
++ *  *     http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.  For additional information regarding
++ *  * copyright in this work, please see the NOTICE file in the top level
++ *  * directory of this distribution.
++ *
++ */
++
++package org.apache.usergrid.persistence.index;
++
++/**
++ * EntityIndex with aliases for multiple indexes
++ */
++public interface AliasedEntityIndex extends EntityIndex{
++
++    /**
++     * Get the indexes for an alias
++     * @param aliasType name of alias
++     * @return list of index names
++     */
++    public String[] getIndexes(final AliasType aliasType);
++
++    /**
++     * Add alias to index, will remove old index from write alias
++     * @param indexSuffix must be different than current index
++     */
++    public void addAlias(final String indexSuffix);
++
++    /**
++     * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
++     * @param indexSuffix index name
++     * @param shards
++     * @param replicas
++     */
++    public void addIndex(final String indexSuffix, final int shards, final int replicas);
++
++    /**
++     * type of alias
++     */
++    public enum AliasType {
++        Read, Write
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index f38a390,4a653b3..549f87e
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@@ -38,31 -38,24 +38,12 @@@ public interface EntityIndex 
       */
      public void initializeIndex();
  
--    /**
--     * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
--     * @param indexSuffix index name
--     * @param shards
--     * @param replicas
--     */
--    public void addIndex(final String indexSuffix, final int shards, final int replicas);
- 
-     /**
-      * Get the indexes for an alias
-      * @param aliasName name of alias
-      * @return list of index names
-      */
-     public String[] getIndexes(final AliasType aliasType);
  
      /**
       * Create the index batch.
       */
      public EntityIndexBatch createBatch();
  
--    /**
--     * Add alias to index, will remove old index from write alias
--     * @param indexSuffix must be different than current index
--     */
--    public void addAlias(final String indexSuffix);
  
      /**
       * Execute query in Usergrid syntax.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index f2cbc6b,a3ca8a5..0d0e52b
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@@ -94,11 -92,9 +94,11 @@@ public class EsEntityIndexBatchImpl imp
  
      private final FailureMonitor failureMonitor;
  
-     private final EntityIndex entityIndex;
++    private final AliasedEntityIndex entityIndex;
 +
  
      public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client, 
-             final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor, final EntityIndex entityIndex ) {
 -            final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor ) {
++            final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
  
          this.applicationScope = applicationScope;
          this.client = client;
@@@ -186,30 -181,9 +186,30 @@@
  
  
          log.debug( "De-indexing type {} with documentId '{}'" , entityType, indexId);
-         String[] indexes = entityIndex.getIndexes(EntityIndex.AliasType.Read);
 -
 -        bulkRequest.add( client.prepareDelete(alias.getWriteAlias(), entityType, indexId ).setRefresh( refresh ) );
 -
++        String[] indexes = entityIndex.getIndexes(AliasedEntityIndex.AliasType.Read);
 +        //get the default index if no alias exists yet
 +        if(indexes == null ||indexes.length == 0){
 +            indexes = new String[]{indexIdentifier.getIndex(null)};
 +        }
 +        final AtomicInteger errorCount = new AtomicInteger();
 +        //get all indexes then flush everyone
 +        Observable.from(indexes).subscribeOn(Schedulers.io())
 +               .map(new Func1<String, Object>() {
 +                   @Override
 +                   public Object call(String index) {
 +                       try {
 +                           bulkRequest.add(client.prepareDelete(index, entityType, indexId).setRefresh(refresh));
 +                       }catch (Exception e){
 +                           log.error("failed to deindex",e);
 +                           errorCount.incrementAndGet();
 +                       }
 +                       return index;
 +                   }
 +               }).toBlocking().last();
 +
 +        if(errorCount.get()>0){
 +            log.error("Failed to flush some indexes");
 +        }
          log.debug( "Deindexed Entity with index id " + indexId );
  
          maybeFlush();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 5824c38,eb050d6..4bca8c2
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@@ -73,6 -72,6 +72,9 @@@ import org.apache.usergrid.persistence.
  import com.google.common.collect.ImmutableMap;
  import com.google.inject.Inject;
  import com.google.inject.assistedinject.Assisted;
++import rx.Observable;
++import rx.functions.Func1;
++import rx.schedulers.Schedulers;
  
  import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
  import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX;
@@@ -83,7 -82,7 +85,7 @@@ import static org.apache.usergrid.persi
  /**
   * Implements index using ElasticSearch Java API.
   */
--public class EsEntityIndexImpl implements EntityIndex {
++public class EsEntityIndexImpl implements AliasedEntityIndex {
  
      private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
  
@@@ -178,18 -184,27 +187,24 @@@
              Boolean isAck;
              String indexName = indexIdentifier.getIndex(indexSuffix);
              final AdminClient adminClient = esProvider.getClient().admin();
+ 
 -            //remove write alias, can only have one
 -            ImmutableOpenMap<String,List<AliasMetaData>> aliasMap = adminClient.indices()
 -                    .getAliases(new GetAliasesRequest(alias.getWriteAlias())).actionGet().getAliases();
 -            String[] indexNames = aliasMap.keys().toArray(String.class);
 +            String[] indexNames = getIndexes(alias.getWriteAlias());
  
              for(String currentIndex : indexNames){
-                 isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,alias.getWriteAlias()).execute().actionGet().isAcknowledged();
-                 logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]",currentIndex, alias, isAck);
+                 isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,
+                         alias.getWriteAlias()).execute().actionGet().isAcknowledged();
  
+                 logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]",currentIndex, alias, isAck);
              }
+ 
              //add read alias
-             isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
+             isAck = adminClient.indices().prepareAliases().addAlias(
+                     indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
              logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck);
+ 
              //add write alias
-             isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+             isAck = adminClient.indices().prepareAliases().addAlias(
+                     indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
              logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck);
  
          } catch (Exception e) {
@@@ -233,17 -231,19 +248,19 @@@
              public boolean doOp() {
                  final String tempId = UUIDGenerator.newTimeUUID().toString();
  
-                 esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD )
-                           .get();
+                 esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId )
 -                        .setSource( DEFAULT_PAYLOAD ).get();
++                        .setSource(DEFAULT_PAYLOAD).get();
  
-                 logger.info( "Successfully created new document with docId {} in index {} and type {}", tempId,
-                         alias, VERIFY_TYPE );
+                 logger.info( "Successfully created new document with docId {} "
+                         + "in index {} and type {}", tempId, alias, VERIFY_TYPE );
  
                  // delete all types, this way if we miss one it will get cleaned up
-                 esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setTypes(VERIFY_TYPE)
-                           .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
+                 esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() )
+                         .setTypes(VERIFY_TYPE) 
 -                        .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
++                        .setQuery(MATCH_ALL_QUERY_BUILDER).get();
  
-                 logger.info( "Successfully deleted all documents in index {} and type {}", alias, VERIFY_TYPE );
+                 logger.info( "Successfully deleted all documents in index {} and type {}", 
+                         alias, VERIFY_TYPE );
  
  
                  return true;
@@@ -440,7 -440,8 +457,15 @@@
              @Override
              public boolean doOp() {
                  try {
-                     esProvider.getClient().admin().indices().prepareRefresh( alias.getReadAlias() ).execute().actionGet();
 -                    esProvider.getClient().admin().indices().prepareRefresh( alias.getWriteAlias() )
 -                            .execute().actionGet();
++                    String[] indexes = getIndexes(AliasType.Read);
++                    Observable.from(indexes).subscribeOn(Schedulers.io()).map(new Func1<String, String>() {
++                        @Override
++                        public String call(String index) {
++                            esProvider.getClient().admin().indices().prepareRefresh( index ).execute().actionGet();
++                            return index;
++                        }
++                    });
++
                      logger.debug( "Refreshed index: " + alias);
  
                      return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 47c4dbd,13d1552..fd94985
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@@ -25,7 -25,6 +25,8 @@@ import java.util.LinkedHashMap
  import java.util.List;
  import java.util.Map;
  
++import org.apache.usergrid.persistence.index.*;
 +import org.apache.usergrid.persistence.index.query.CandidateResult;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.slf4j.Logger;
@@@ -38,11 -37,11 +39,6 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
  import org.apache.usergrid.persistence.core.test.UseModules;
  import org.apache.usergrid.persistence.core.util.Health;
--import org.apache.usergrid.persistence.index.EntityIndex;
--import org.apache.usergrid.persistence.index.EntityIndexBatch;
--import org.apache.usergrid.persistence.index.EntityIndexFactory;
--import org.apache.usergrid.persistence.index.IndexScope;
--import org.apache.usergrid.persistence.index.SearchTypes;
  import org.apache.usergrid.persistence.index.guice.TestIndexModule;
  import org.apache.usergrid.persistence.index.query.CandidateResults;
  import org.apache.usergrid.persistence.index.query.Query;
@@@ -115,7 -114,7 +111,7 @@@ public class EntityIndexTest extends Ba
  
          ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
  
--        EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
++        AliasedEntityIndex entityIndex =(AliasedEntityIndex) eif.createEntityIndex( applicationScope );
          entityIndex.initializeIndex();
  
          final String entityType = "thing";
@@@ -140,41 -139,6 +136,42 @@@
          testQuery(indexScope, searchTypes, entityIndex, "name = 'Lowe Kelley'", 1 );
      }
  
 +    @Test
 +    public void testDeleteByQueryWithAlias() throws IOException {
 +        Id appId = new SimpleId( "application" );
 +
 +        ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
 +
-         EntityIndex entityIndex = eif.createEntityIndex(applicationScope);
++        AliasedEntityIndex entityIndex =(AliasedEntityIndex) eif.createEntityIndex( applicationScope );
++
 +        entityIndex.initializeIndex();
 +
 +        final String entityType = "thing";
 +        IndexScope indexScope = new IndexScopeImpl( appId, "things" );
 +        final SearchTypes searchTypes = SearchTypes.fromTypes( entityType );
 +
 +        insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",1,0);
 +
 +        entityIndex.refresh();
 +
-         entityIndex.addIndex("v2", 1,0);
++        entityIndex.addIndex("v2", 1, 0);
 +
-         insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",1,0);
++        insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json", 1, 0);
 +
 +        entityIndex.refresh();
 +        CandidateResults crs = testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 2);
 +
 +        EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
 +        entityIndexBatch.deindex(indexScope, crs.get(0));
 +        entityIndexBatch.deindex(indexScope, crs.get(1));
 +        entityIndexBatch.executeAndRefresh();
 +        entityIndex.refresh();
 +
 +        //Hilda Youn
 +        testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0);
 +
 +    }
 +
      private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException {
          InputStream is = this.getClass().getResourceAsStream( filePath );
          ObjectMapper mapper = new ObjectMapper();