You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/12/12 15:30:13 UTC
[14/50] [abbrv] incubator-usergrid git commit: merge
merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/21630f49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/21630f49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/21630f49
Branch: refs/heads/no-source-in-es
Commit: 21630f498e7f59cd760e0fd2cd69c1f38de25f45
Parents: 8cdfaf9 a63b856
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Dec 8 17:23:52 2014 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Dec 8 17:23:52 2014 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 230 ++++++++++---------
.../corepersistence/CpEntityManagerFactory.java | 6 +-
.../corepersistence/util/CpEntityMapUtils.java | 32 ++-
.../batch/job/SchedulerRuntimeIntervalIT.java | 49 ++--
.../migration/EntityDataMigrationIT.java | 139 +++++------
.../migration/EntityTypeMappingMigrationIT.java | 89 +++----
.../apache/usergrid/persistence/CounterIT.java | 89 ++++---
.../usergrid/persistence/CountingMutatorIT.java | 4 +-
.../persistence/EntityDictionaryIT.java | 11 +-
.../usergrid/persistence/EntityManagerIT.java | 47 ++--
.../persistence/GeoQueryBooleanTest.java | 7 +-
.../apache/usergrid/persistence/IndexIT.java | 14 +-
.../usergrid/persistence/PathQueryIT.java | 10 +-
.../PerformanceEntityRebuildIndexTest.java | 6 +-
.../usergrid/persistence/PermissionsIT.java | 5 +-
.../cassandra/EntityManagerFactoryImplIT.java | 9 +-
.../query/AbstractIteratingQueryIT.java | 3 +-
stack/core/src/test/resources/log4j.properties | 5 +-
.../data/DataMigrationManagerImpl.java | 29 ++-
.../persistence/index/AliasedEntityIndex.java | 55 +++++
.../usergrid/persistence/index/EntityIndex.java | 26 ---
.../persistence/index/IndexIdentifier.java | 4 +
.../index/impl/EsEntityIndexBatchImpl.java | 6 +-
.../index/impl/EsEntityIndexImpl.java | 88 ++++---
.../persistence/index/impl/EntityIndexTest.java | 15 +-
.../persistence/queue/DefaultQueueManager.java | 68 ++++++
.../usergrid/services/TestQueueManager.java | 65 ------
.../AbstractServiceNotificationIT.java | 16 +-
.../apns/NotificationsServiceIT.java | 12 +-
.../gcm/NotificationsServiceIT.java | 15 +-
30 files changed, 647 insertions(+), 507 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f6b0df8,f6b0df8..56eb258
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@@ -64,6 -64,6 +64,7 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
++import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.query.Query;
@@@ -702,7 -702,7 +703,10 @@@ public class CpEntityManagerFactory imp
@Override
public void addIndex(final UUID applicationId,final String indexSuffix,final int shards,final int replicas){
-- getManagerCache().getEntityIndex(CpNamingUtils.getApplicationScope( applicationId )).addIndex(indexSuffix,shards,replicas);
++ EntityIndex entityIndex = getManagerCache().getEntityIndex(CpNamingUtils.getApplicationScope(applicationId));
++ if(entityIndex instanceof AliasedEntityIndex) {
++ ((AliasedEntityIndex)entityIndex).addIndex(indexSuffix, shards, replicas);
++ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/AliasedEntityIndex.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/AliasedEntityIndex.java
index 0000000,0000000..15a0d45
new file mode 100644
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/AliasedEntityIndex.java
@@@ -1,0 -1,0 +1,55 @@@
++/*
++ *
++ * * Licensed to the Apache Software Foundation (ASF) under one or more
++ * * contributor license agreements. The ASF licenses this file to You
++ * * under the Apache License, Version 2.0 (the "License"); you may not
++ * * use this file except in compliance with the License.
++ * * You may obtain a copy of the License at
++ * *
++ * * http://www.apache.org/licenses/LICENSE-2.0
++ * *
++ * * Unless required by applicable law or agreed to in writing, software
++ * * distributed under the License is distributed on an "AS IS" BASIS,
++ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * * See the License for the specific language governing permissions and
++ * * limitations under the License. For additional information regarding
++ * * copyright in this work, please see the NOTICE file in the top level
++ * * directory of this distribution.
++ *
++ */
++
++package org.apache.usergrid.persistence.index;
++
++/**
++ * EntityIndex with aliases for multiple indexes
++ */
++public interface AliasedEntityIndex extends EntityIndex{
++
++ /**
++ * Get the indexes for an alias
++ * @param aliasType name of alias
++ * @return list of index names
++ */
++ public String[] getIndexes(final AliasType aliasType);
++
++ /**
++ * Add alias to index, will remove old index from write alias
++ * @param indexSuffix must be different than current index
++ */
++ public void addAlias(final String indexSuffix);
++
++ /**
++ * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
++ * @param indexSuffix index name
++ * @param shards
++ * @param replicas
++ */
++ public void addIndex(final String indexSuffix, final int shards, final int replicas);
++
++ /**
++ * type of alias
++ */
++ public enum AliasType {
++ Read, Write
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index f38a390,4a653b3..549f87e
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@@ -38,31 -38,24 +38,12 @@@ public interface EntityIndex
*/
public void initializeIndex();
-- /**
-- * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
-- * @param indexSuffix index name
-- * @param shards
-- * @param replicas
-- */
-- public void addIndex(final String indexSuffix, final int shards, final int replicas);
-
- /**
- * Get the indexes for an alias
- * @param aliasName name of alias
- * @return list of index names
- */
- public String[] getIndexes(final AliasType aliasType);
/**
* Create the index batch.
*/
public EntityIndexBatch createBatch();
-- /**
-- * Add alias to index, will remove old index from write alias
-- * @param indexSuffix must be different than current index
-- */
-- public void addAlias(final String indexSuffix);
/**
* Execute query in Usergrid syntax.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index f2cbc6b,a3ca8a5..0d0e52b
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@@ -94,11 -92,9 +94,11 @@@ public class EsEntityIndexBatchImpl imp
private final FailureMonitor failureMonitor;
- private final EntityIndex entityIndex;
++ private final AliasedEntityIndex entityIndex;
+
public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client,
- final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor, final EntityIndex entityIndex ) {
- final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor ) {
++ final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
this.applicationScope = applicationScope;
this.client = client;
@@@ -186,30 -181,9 +186,30 @@@
log.debug( "De-indexing type {} with documentId '{}'" , entityType, indexId);
- String[] indexes = entityIndex.getIndexes(EntityIndex.AliasType.Read);
-
- bulkRequest.add( client.prepareDelete(alias.getWriteAlias(), entityType, indexId ).setRefresh( refresh ) );
-
++ String[] indexes = entityIndex.getIndexes(AliasedEntityIndex.AliasType.Read);
+ //get the default index if no alias exists yet
+ if(indexes == null ||indexes.length == 0){
+ indexes = new String[]{indexIdentifier.getIndex(null)};
+ }
+ final AtomicInteger errorCount = new AtomicInteger();
+ //get all indexes then flush everyone
+ Observable.from(indexes).subscribeOn(Schedulers.io())
+ .map(new Func1<String, Object>() {
+ @Override
+ public Object call(String index) {
+ try {
+ bulkRequest.add(client.prepareDelete(index, entityType, indexId).setRefresh(refresh));
+ }catch (Exception e){
+ log.error("failed to deindex",e);
+ errorCount.incrementAndGet();
+ }
+ return index;
+ }
+ }).toBlocking().last();
+
+ if(errorCount.get()>0){
+ log.error("Failed to flush some indexes");
+ }
log.debug( "Deindexed Entity with index id " + indexId );
maybeFlush();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 5824c38,eb050d6..4bca8c2
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@@ -73,6 -72,6 +72,9 @@@ import org.apache.usergrid.persistence.
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
++import rx.Observable;
++import rx.functions.Func1;
++import rx.schedulers.Schedulers;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX;
@@@ -83,7 -82,7 +85,7 @@@ import static org.apache.usergrid.persi
/**
* Implements index using ElasticSearch Java API.
*/
--public class EsEntityIndexImpl implements EntityIndex {
++public class EsEntityIndexImpl implements AliasedEntityIndex {
private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
@@@ -178,18 -184,27 +187,24 @@@
Boolean isAck;
String indexName = indexIdentifier.getIndex(indexSuffix);
final AdminClient adminClient = esProvider.getClient().admin();
+
- //remove write alias, can only have one
- ImmutableOpenMap<String,List<AliasMetaData>> aliasMap = adminClient.indices()
- .getAliases(new GetAliasesRequest(alias.getWriteAlias())).actionGet().getAliases();
- String[] indexNames = aliasMap.keys().toArray(String.class);
+ String[] indexNames = getIndexes(alias.getWriteAlias());
for(String currentIndex : indexNames){
- isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,alias.getWriteAlias()).execute().actionGet().isAcknowledged();
- logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]",currentIndex, alias, isAck);
+ isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,
+ alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+ logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]",currentIndex, alias, isAck);
}
+
//add read alias
- isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
+ isAck = adminClient.indices().prepareAliases().addAlias(
+ indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck);
+
//add write alias
- isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+ isAck = adminClient.indices().prepareAliases().addAlias(
+ indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck);
} catch (Exception e) {
@@@ -233,17 -231,19 +248,19 @@@
public boolean doOp() {
final String tempId = UUIDGenerator.newTimeUUID().toString();
- esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD )
- .get();
+ esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId )
- .setSource( DEFAULT_PAYLOAD ).get();
++ .setSource(DEFAULT_PAYLOAD).get();
- logger.info( "Successfully created new document with docId {} in index {} and type {}", tempId,
- alias, VERIFY_TYPE );
+ logger.info( "Successfully created new document with docId {} "
+ + "in index {} and type {}", tempId, alias, VERIFY_TYPE );
// delete all types, this way if we miss one it will get cleaned up
- esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setTypes(VERIFY_TYPE)
- .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
+ esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() )
+ .setTypes(VERIFY_TYPE)
- .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
++ .setQuery(MATCH_ALL_QUERY_BUILDER).get();
- logger.info( "Successfully deleted all documents in index {} and type {}", alias, VERIFY_TYPE );
+ logger.info( "Successfully deleted all documents in index {} and type {}",
+ alias, VERIFY_TYPE );
return true;
@@@ -440,7 -440,8 +457,15 @@@
@Override
public boolean doOp() {
try {
- esProvider.getClient().admin().indices().prepareRefresh( alias.getReadAlias() ).execute().actionGet();
- esProvider.getClient().admin().indices().prepareRefresh( alias.getWriteAlias() )
- .execute().actionGet();
++ String[] indexes = getIndexes(AliasType.Read);
++ Observable.from(indexes).subscribeOn(Schedulers.io()).map(new Func1<String, String>() {
++ @Override
++ public String call(String index) {
++ esProvider.getClient().admin().indices().prepareRefresh( index ).execute().actionGet();
++ return index;
++ }
++ });
++
logger.debug( "Refreshed index: " + alias);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/21630f49/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 47c4dbd,13d1552..fd94985
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@@ -25,7 -25,6 +25,8 @@@ import java.util.LinkedHashMap
import java.util.List;
import java.util.Map;
++import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@@@ -38,11 -37,11 +39,6 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.core.util.Health;
--import org.apache.usergrid.persistence.index.EntityIndex;
--import org.apache.usergrid.persistence.index.EntityIndexBatch;
--import org.apache.usergrid.persistence.index.EntityIndexFactory;
--import org.apache.usergrid.persistence.index.IndexScope;
--import org.apache.usergrid.persistence.index.SearchTypes;
import org.apache.usergrid.persistence.index.guice.TestIndexModule;
import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Query;
@@@ -115,7 -114,7 +111,7 @@@ public class EntityIndexTest extends Ba
ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
-- EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
++ AliasedEntityIndex entityIndex =(AliasedEntityIndex) eif.createEntityIndex( applicationScope );
entityIndex.initializeIndex();
final String entityType = "thing";
@@@ -140,41 -139,6 +136,42 @@@
testQuery(indexScope, searchTypes, entityIndex, "name = 'Lowe Kelley'", 1 );
}
+ @Test
+ public void testDeleteByQueryWithAlias() throws IOException {
+ Id appId = new SimpleId( "application" );
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
+
- EntityIndex entityIndex = eif.createEntityIndex(applicationScope);
++ AliasedEntityIndex entityIndex =(AliasedEntityIndex) eif.createEntityIndex( applicationScope );
++
+ entityIndex.initializeIndex();
+
+ final String entityType = "thing";
+ IndexScope indexScope = new IndexScopeImpl( appId, "things" );
+ final SearchTypes searchTypes = SearchTypes.fromTypes( entityType );
+
+ insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",1,0);
+
+ entityIndex.refresh();
+
- entityIndex.addIndex("v2", 1,0);
++ entityIndex.addIndex("v2", 1, 0);
+
- insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",1,0);
++ insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json", 1, 0);
+
+ entityIndex.refresh();
+ CandidateResults crs = testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 2);
+
+ EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
+ entityIndexBatch.deindex(indexScope, crs.get(0));
+ entityIndexBatch.deindex(indexScope, crs.get(1));
+ entityIndexBatch.executeAndRefresh();
+ entityIndex.refresh();
+
+ //Hilda Youn
+ testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0);
+
+ }
+
private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException {
InputStream is = this.getClass().getResourceAsStream( filePath );
ObjectMapper mapper = new ObjectMapper();