You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/11/25 00:55:51 UTC
incubator-usergrid git commit: add write_alias and read_alias
Repository: incubator-usergrid
Updated Branches:
refs/heads/index-alias 486c0d251 -> af16be295
add write_alias and read_alias
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/af16be29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/af16be29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/af16be29
Branch: refs/heads/index-alias
Commit: af16be295a880c3bb62dbc013c656380b5fe7537
Parents: 486c0d2
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Nov 24 16:55:33 2014 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Nov 24 16:55:33 2014 -0700
----------------------------------------------------------------------
.../persistence/index/IndexIdentifier.java | 21 ++++-
.../index/impl/EsEntityIndexBatchImpl.java | 8 +-
.../index/impl/EsEntityIndexImpl.java | 88 +++++++++-----------
3 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af16be29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
index 58e6ee5..82a6023 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
@@ -39,8 +39,8 @@ public class IndexIdentifier{
* Get the alias name
* @return
*/
- public String getAlias() {
- return getIndexBase() + "_" + config.getAliasPostfix();
+ public IndexAlias getAlias() {
+ return new IndexAlias(config,getIndexBase());
}
/**
@@ -67,5 +67,22 @@ public class IndexIdentifier{
return sb.toString();
}
+ public class IndexAlias{
+ private final String readAlias;
+ private final String writeAlias;
+
+ public IndexAlias(IndexFig indexFig,String indexBase) {
+ this.writeAlias = indexBase + "_write_" + indexFig.getAliasPostfix();
+ this.readAlias = indexBase + "_read_" + indexFig.getAliasPostfix();
+ }
+
+ public String getReadAlias() {
+ return readAlias;
+ }
+
+ public String getWriteAlias() {
+ return writeAlias;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af16be29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 9589e66..0c2dec0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -81,7 +81,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final boolean refresh;
- private final String aliasName;
+ private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
private BulkRequestBuilder bulkRequest;
@@ -100,7 +100,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
this.client = client;
this.failureMonitor = failureMonitor;
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
- this.aliasName = indexIdentifier.getAlias();
+ this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
this.autoFlushSize = autoFlushSize;
initBatch();
@@ -144,7 +144,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
- bulkRequest.add( client.prepareIndex( aliasName, entityType, indexId ).setSource( entityAsMap ) );
+ bulkRequest.add( client.prepareIndex(alias.getWriteAlias(), entityType, indexId ).setSource( entityAsMap ) );
maybeFlush();
@@ -182,7 +182,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug( "De-indexing type {} with documentId '{}'" , entityType, indexId);
- bulkRequest.add( client.prepareDelete( aliasName, entityType, indexId ).setRefresh( refresh ) );
+ bulkRequest.add( client.prepareDelete(alias.getReadAlias(), entityType, indexId ).setRefresh( refresh ) );
log.debug( "Deindexed Entity with index id " + indexId );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af16be29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index f9623ce..540aec4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.index.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,21 +29,13 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.AliasMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.hppc.ObjectContainer;
-import org.elasticsearch.common.hppc.cursors.ObjectCursor;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -93,7 +84,7 @@ public class EsEntityIndexImpl implements EntityIndex {
private static final AtomicBoolean mappingsCreated = new AtomicBoolean( false );
- private final String aliasName;
+ private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
/**
@@ -133,8 +124,8 @@ public class EsEntityIndexImpl implements EntityIndex {
this.esProvider = provider;
this.config = config;
this.cursorTimeout = config.getQueryCursorTimeout();
- this.indexIdentifier = IndexingUtils.createIndexIdentifier(config,appScope);
- this.aliasName = indexIdentifier.getAlias();
+ this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, appScope);
+ this.alias = indexIdentifier.getAlias();
this.failureMonitor = new FailureMonitorImpl( config, provider );
}
@@ -176,19 +167,22 @@ public class EsEntityIndexImpl implements EntityIndex {
Settings settings = ImmutableSettings.settingsBuilder().put( "index.number_of_shards", numberOfShards)
.put( "index.number_of_replicas", numberOfReplicas ).build();
+ //TODO:swallow exception, look into setting up routing rules
String indexVersionName = indexIdentifier.getIndex(0);
- if(!admin.indices().exists(new IndicesExistsRequest(indexVersionName)).actionGet().isExists()) {
+ try {
final CreateIndexResponse cir = admin.indices().prepareCreate(indexVersionName).setSettings(settings).execute().actionGet();
- logger.info( "Created new Index Name [{}] ACK=[{}]", indexVersionName, cir.isAcknowledged() );
- }else{
- logger.info( " Index Name [{}] already exists", indexVersionName);
+ logger.info("Created new Index Name [{}] ACK=[{}]", indexVersionName, cir.isAcknowledged());
+ }catch(IndexAlreadyExistsException e){
+ logger.info("Index Name [{}] already exists",indexVersionName);
}
- //check if alias exists and get the alias
- if(!admin.indices().aliasesExist(new GetAliasesRequest(aliasName)).actionGet().exists()) {
- final Boolean isAck = admin.indices().prepareAliases().addAlias(indexVersionName, aliasName).execute().actionGet().isAcknowledged();
- logger.info( "Created new Alias Name [{}] ACK=[{}]", aliasName, isAck);
- }else{
- logger.info( " Alias Name [{}] already exists", aliasName);
+ try { //check if alias exists and get the alias
+ Boolean isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
+ logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck);
+
+ isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+ logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck);
+ } catch (Exception e) {
+ logger.warn("Failed to create alias ", e);
}
}
@@ -200,24 +194,24 @@ public class EsEntityIndexImpl implements EntityIndex {
private void testNewIndex() {
- logger.info( "Refreshing Created new Index Name [{}]", aliasName );
+ logger.info( "Refreshing Created new Index Name [{}]", alias);
final RetryOperation retryOperation = new RetryOperation() {
@Override
public boolean doOp() {
final String tempId = UUIDGenerator.newTimeUUID().toString();
- esProvider.getClient().prepareIndex( aliasName, VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD )
+ esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD )
.get();
logger.info( "Successfully created new document with docId {} in index {} and type {}", tempId,
- aliasName, VERIFY_TYPE );
+ alias, VERIFY_TYPE );
// delete all types, this way if we miss one it will get cleaned up
- esProvider.getClient().prepareDeleteByQuery( aliasName ).setTypes( VERIFY_TYPE )
+ esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setTypes(VERIFY_TYPE)
.setQuery( MATCH_ALL_QUERY_BUILDER ).get();
- logger.info( "Successfully deleted all documents in index {} and type {}", aliasName, VERIFY_TYPE );
+ logger.info( "Successfully deleted all documents in index {} and type {}", alias, VERIFY_TYPE );
return true;
@@ -234,8 +228,8 @@ public class EsEntityIndexImpl implements EntityIndex {
*/
private void createMappings() throws IOException {
- XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
- XContentFactory.jsonBuilder(), "_default_" );
+ XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
+ XContentFactory.jsonBuilder(), "_default_");
PutIndexTemplateResponse pitr = esProvider.getClient().admin().indices()
.preparePutTemplate("usergrid_template")
@@ -260,7 +254,7 @@ public class EsEntityIndexImpl implements EntityIndex {
public CandidateResults search( final IndexScope indexScope, final SearchTypes searchTypes,
final Query query ) {
- final String context = IndexingUtils.createContextName( indexScope );
+ final String context = IndexingUtils.createContextName(indexScope);
final String[] entityTypes = searchTypes.getTypeNames();
QueryBuilder qb = query.createQueryBuilder( context );
@@ -269,8 +263,8 @@ public class EsEntityIndexImpl implements EntityIndex {
SearchResponse searchResponse;
if ( query.getCursor() == null ) {
- SearchRequestBuilder srb = esProvider.getClient().prepareSearch( aliasName ).setTypes( entityTypes )
- .setScroll( cursorTimeout + "m" ).setQuery( qb );
+ SearchRequestBuilder srb = esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes(entityTypes)
+ .setScroll(cursorTimeout + "m").setQuery(qb);
@@ -324,7 +318,7 @@ public class EsEntityIndexImpl implements EntityIndex {
if ( logger.isDebugEnabled() ) {
logger.debug( "Searching index {}\n scope{} \n type {}\n query {} ", new Object[] {
- this.aliasName, context, entityTypes, srb
+ this.alias, context, entityTypes, srb
} );
}
@@ -368,7 +362,7 @@ public class EsEntityIndexImpl implements EntityIndex {
failureMonitor.success();
}
- return parseResults( searchResponse, query );
+ return parseResults(searchResponse, query);
}
@@ -398,7 +392,7 @@ public class EsEntityIndexImpl implements EntityIndex {
if ( candidates.size() >= query.getLimit() ) {
candidateResults.setCursor( searchResponse.getScrollId() );
- logger.debug( " Cursor = " + searchResponse.getScrollId() );
+ logger.debug(" Cursor = " + searchResponse.getScrollId());
}
return candidateResults;
@@ -408,14 +402,14 @@ public class EsEntityIndexImpl implements EntityIndex {
public void refresh() {
- logger.info( "Refreshing Created new Index Name [{}]", aliasName );
+ logger.info( "Refreshing Created new Index Name [{}]", alias);
final RetryOperation retryOperation = new RetryOperation() {
@Override
public boolean doOp() {
try {
- esProvider.getClient().admin().indices().prepareRefresh( aliasName ).execute().actionGet();
- logger.debug( "Refreshed index: " + aliasName );
+ esProvider.getClient().admin().indices().prepareRefresh( alias.getWriteAlias() ).execute().actionGet();
+ logger.debug( "Refreshed index: " + alias);
return true;
}
@@ -428,7 +422,7 @@ public class EsEntityIndexImpl implements EntityIndex {
doInRetry( retryOperation );
- logger.debug( "Refreshed index: " + aliasName );
+ logger.debug( "Refreshed index: " + alias);
}
@@ -447,15 +441,15 @@ public class EsEntityIndexImpl implements EntityIndex {
//since we don't have paging inputs, there's no point in executing a query for paging.
- final String context = IndexingUtils.createContextName( scope );
+ final String context = IndexingUtils.createContextName(scope);
final SearchTypes searchTypes = SearchTypes.fromTypes( id.getType() );
final QueryBuilder queryBuilder = QueryBuilders.termQuery( IndexingUtils.ENTITY_CONTEXT_FIELDNAME, context );
final SearchRequestBuilder srb =
- esProvider.getClient().prepareSearch( aliasName ).setTypes( searchTypes.getTypeNames() )
- .setScroll( cursorTimeout + "m" ).setQuery( queryBuilder );
+ esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes(searchTypes.getTypeNames())
+ .setScroll(cursorTimeout + "m").setQuery(queryBuilder);
@@ -472,7 +466,7 @@ public class EsEntityIndexImpl implements EntityIndex {
failureMonitor.success();
- return parseResults( searchResponse, new Query( ) );
+ return parseResults(searchResponse, new Query());
}
@@ -481,12 +475,12 @@ public class EsEntityIndexImpl implements EntityIndex {
*/
public void deleteIndex() {
AdminClient adminClient = esProvider.getClient().admin();
- DeleteIndexResponse response = adminClient.indices().prepareDelete( aliasName ).get();
+ DeleteIndexResponse response = adminClient.indices().prepareDelete( indexIdentifier.getIndex(0) ).get();
if ( response.isAcknowledged() ) {
- logger.info( "Deleted index: " + aliasName );
+ logger.info( "Deleted index: " + alias);
}
else {
- logger.info( "Failed to delete index " + aliasName );
+ logger.info( "Failed to delete index " + alias);
}
}
@@ -545,7 +539,7 @@ public class EsEntityIndexImpl implements EntityIndex {
try {
ClusterHealthResponse chr = esProvider.getClient().admin().cluster()
- .health( new ClusterHealthRequest( new String[] { aliasName } ) )
+ .health(new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(0)}))
.get();
return Health.valueOf( chr.getStatus().name() );
}