You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/11/25 00:55:51 UTC

incubator-usergrid git commit: add write_alias and read_alias

Repository: incubator-usergrid
Updated Branches:
  refs/heads/index-alias 486c0d251 -> af16be295


add write_alias and read_alias


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/af16be29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/af16be29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/af16be29

Branch: refs/heads/index-alias
Commit: af16be295a880c3bb62dbc013c656380b5fe7537
Parents: 486c0d2
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Nov 24 16:55:33 2014 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Nov 24 16:55:33 2014 -0700

----------------------------------------------------------------------
 .../persistence/index/IndexIdentifier.java      | 21 ++++-
 .../index/impl/EsEntityIndexBatchImpl.java      |  8 +-
 .../index/impl/EsEntityIndexImpl.java           | 88 +++++++++-----------
 3 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af16be29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
index 58e6ee5..82a6023 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
@@ -39,8 +39,8 @@ public class IndexIdentifier{
      * Get the alias name
      * @return
      */
-    public String getAlias() {
-        return getIndexBase() + "_" + config.getAliasPostfix();
+    public IndexAlias getAlias() {
+        return new IndexAlias(config,getIndexBase());
     }
 
     /**
@@ -67,5 +67,22 @@ public class IndexIdentifier{
         return sb.toString();
     }
 
+    public class IndexAlias{
+        private final String readAlias;
+        private final String writeAlias;
+
+        public IndexAlias(IndexFig indexFig,String indexBase) {
+            this.writeAlias = indexBase + "_write_" + indexFig.getAliasPostfix();
+            this.readAlias = indexBase + "_read_" + indexFig.getAliasPostfix();
+        }
+
+        public String getReadAlias() {
+            return readAlias;
+        }
+
+        public String getWriteAlias() {
+            return writeAlias;
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af16be29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 9589e66..0c2dec0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -81,7 +81,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     private final boolean refresh;
 
-    private final String aliasName;
+    private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
 
     private BulkRequestBuilder bulkRequest;
@@ -100,7 +100,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.client = client;
         this.failureMonitor = failureMonitor;
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
-        this.aliasName = indexIdentifier.getAlias();
+        this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
         this.autoFlushSize = autoFlushSize;
         initBatch();
@@ -144,7 +144,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
 
-        bulkRequest.add( client.prepareIndex( aliasName, entityType, indexId ).setSource( entityAsMap ) );
+        bulkRequest.add( client.prepareIndex(alias.getWriteAlias(), entityType, indexId ).setSource( entityAsMap ) );
 
         maybeFlush();
 
@@ -182,7 +182,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug( "De-indexing type {} with documentId '{}'" , entityType, indexId);
 
-        bulkRequest.add( client.prepareDelete( aliasName, entityType, indexId ).setRefresh( refresh ) );
+        bulkRequest.add( client.prepareDelete(alias.getReadAlias(), entityType, indexId ).setRefresh( refresh ) );
 
         log.debug( "Deindexed Entity with index id " + indexId );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af16be29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index f9623ce..540aec4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -30,21 +29,13 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequestBuilder;
 import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.AliasMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.hppc.ObjectContainer;
-import org.elasticsearch.common.hppc.cursors.ObjectCursor;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -93,7 +84,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     private static final AtomicBoolean mappingsCreated = new AtomicBoolean( false );
 
-    private final String aliasName;
+    private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
 
     /**
@@ -133,8 +124,8 @@ public class EsEntityIndexImpl implements EntityIndex {
         this.esProvider = provider;
         this.config = config;
         this.cursorTimeout = config.getQueryCursorTimeout();
-        this.indexIdentifier = IndexingUtils.createIndexIdentifier(config,appScope);
-        this.aliasName = indexIdentifier.getAlias();
+        this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, appScope);
+        this.alias = indexIdentifier.getAlias();
         this.failureMonitor = new FailureMonitorImpl( config, provider );
     }
 
@@ -176,19 +167,22 @@ public class EsEntityIndexImpl implements EntityIndex {
         Settings settings = ImmutableSettings.settingsBuilder().put( "index.number_of_shards", numberOfShards)
                 .put( "index.number_of_replicas", numberOfReplicas ).build();
 
+        //TODO:swallow exception, look into setting up routing rules
         String indexVersionName =  indexIdentifier.getIndex(0);
-        if(!admin.indices().exists(new IndicesExistsRequest(indexVersionName)).actionGet().isExists()) {
+        try {
             final CreateIndexResponse cir = admin.indices().prepareCreate(indexVersionName).setSettings(settings).execute().actionGet();
-            logger.info( "Created new Index Name [{}] ACK=[{}]", indexVersionName, cir.isAcknowledged() );
-        }else{
-            logger.info( " Index Name [{}] already exists", indexVersionName);
+            logger.info("Created new Index Name [{}] ACK=[{}]", indexVersionName, cir.isAcknowledged());
+        }catch(IndexAlreadyExistsException e){
+            logger.info("Index Name [{}] already exists",indexVersionName);
         }
-        //check if alias exists and get the alias
-        if(!admin.indices().aliasesExist(new GetAliasesRequest(aliasName)).actionGet().exists()) {
-            final Boolean isAck = admin.indices().prepareAliases().addAlias(indexVersionName, aliasName).execute().actionGet().isAcknowledged();
-            logger.info( "Created new Alias Name [{}] ACK=[{}]", aliasName, isAck);
-        }else{
-            logger.info( " Alias Name [{}] already exists", aliasName);
+        try {   //check if alias exists and get the alias
+            Boolean isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
+            logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck);
+
+            isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+            logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck);
+        } catch (Exception e) {
+            logger.warn("Failed to create alias ", e);
         }
     }
 
@@ -200,24 +194,24 @@ public class EsEntityIndexImpl implements EntityIndex {
     private void testNewIndex() {
 
 
-        logger.info( "Refreshing Created new Index Name [{}]", aliasName );
+        logger.info( "Refreshing Created new Index Name [{}]", alias);
 
         final RetryOperation retryOperation = new RetryOperation() {
             @Override
             public boolean doOp() {
                 final String tempId = UUIDGenerator.newTimeUUID().toString();
 
-                esProvider.getClient().prepareIndex( aliasName, VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD )
+                esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD )
                           .get();
 
                 logger.info( "Successfully created new document with docId {} in index {} and type {}", tempId,
-                        aliasName, VERIFY_TYPE );
+                        alias, VERIFY_TYPE );
 
                 // delete all types, this way if we miss one it will get cleaned up
-                esProvider.getClient().prepareDeleteByQuery( aliasName ).setTypes( VERIFY_TYPE )
+                esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setTypes(VERIFY_TYPE)
                           .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
 
-                logger.info( "Successfully deleted all documents in index {} and type {}", aliasName, VERIFY_TYPE );
+                logger.info( "Successfully deleted all documents in index {} and type {}", alias, VERIFY_TYPE );
 
 
                 return true;
@@ -234,8 +228,8 @@ public class EsEntityIndexImpl implements EntityIndex {
      */
     private void createMappings() throws IOException {
 
-        XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping( 
-                XContentFactory.jsonBuilder(), "_default_" );
+        XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
+                XContentFactory.jsonBuilder(), "_default_");
 
         PutIndexTemplateResponse pitr = esProvider.getClient().admin().indices()
                 .preparePutTemplate("usergrid_template")
@@ -260,7 +254,7 @@ public class EsEntityIndexImpl implements EntityIndex {
     public CandidateResults search( final IndexScope indexScope, final SearchTypes searchTypes, 
             final Query query ) {
 
-        final String context = IndexingUtils.createContextName( indexScope );
+        final String context = IndexingUtils.createContextName(indexScope);
         final String[] entityTypes = searchTypes.getTypeNames();
 
         QueryBuilder qb = query.createQueryBuilder( context );
@@ -269,8 +263,8 @@ public class EsEntityIndexImpl implements EntityIndex {
         SearchResponse searchResponse;
 
         if ( query.getCursor() == null ) {
-            SearchRequestBuilder srb = esProvider.getClient().prepareSearch( aliasName ).setTypes( entityTypes )
-                                                 .setScroll( cursorTimeout + "m" ).setQuery( qb );
+            SearchRequestBuilder srb = esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes(entityTypes)
+                                                 .setScroll(cursorTimeout + "m").setQuery(qb);
 
 
 
@@ -324,7 +318,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
             if ( logger.isDebugEnabled() ) {
                 logger.debug( "Searching index {}\n  scope{} \n type {}\n   query {} ", new Object[] {
-                        this.aliasName, context, entityTypes, srb
+                        this.alias, context, entityTypes, srb
                 } );
 
             }
@@ -368,7 +362,7 @@ public class EsEntityIndexImpl implements EntityIndex {
             failureMonitor.success();
         }
 
-        return parseResults( searchResponse, query );
+        return parseResults(searchResponse, query);
     }
 
 
@@ -398,7 +392,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         if ( candidates.size() >= query.getLimit() ) {
             candidateResults.setCursor( searchResponse.getScrollId() );
-            logger.debug( "   Cursor = " + searchResponse.getScrollId() );
+            logger.debug("   Cursor = " + searchResponse.getScrollId());
         }
 
         return candidateResults;
@@ -408,14 +402,14 @@ public class EsEntityIndexImpl implements EntityIndex {
     public void refresh() {
 
 
-        logger.info( "Refreshing Created new Index Name [{}]", aliasName );
+        logger.info( "Refreshing Created new Index Name [{}]", alias);
 
         final RetryOperation retryOperation = new RetryOperation() {
             @Override
             public boolean doOp() {
                 try {
-                    esProvider.getClient().admin().indices().prepareRefresh( aliasName ).execute().actionGet();
-                    logger.debug( "Refreshed index: " + aliasName );
+                    esProvider.getClient().admin().indices().prepareRefresh( alias.getWriteAlias() ).execute().actionGet();
+                    logger.debug( "Refreshed index: " + alias);
 
                     return true;
                 }
@@ -428,7 +422,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         doInRetry( retryOperation );
 
-        logger.debug( "Refreshed index: " + aliasName );
+        logger.debug( "Refreshed index: " + alias);
     }
 
 
@@ -447,15 +441,15 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         //since we don't have paging inputs, there's no point in executing a query for paging.
 
-        final String context = IndexingUtils.createContextName( scope );
+        final String context = IndexingUtils.createContextName(scope);
         final SearchTypes searchTypes = SearchTypes.fromTypes( id.getType() );
 
         final QueryBuilder queryBuilder = QueryBuilders.termQuery( IndexingUtils.ENTITY_CONTEXT_FIELDNAME, context );
 
 
         final SearchRequestBuilder srb =
-                esProvider.getClient().prepareSearch( aliasName ).setTypes( searchTypes.getTypeNames() )
-                          .setScroll( cursorTimeout + "m" ).setQuery( queryBuilder );
+                esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes(searchTypes.getTypeNames())
+                          .setScroll(cursorTimeout + "m").setQuery(queryBuilder);
 
 
 
@@ -472,7 +466,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         failureMonitor.success();
 
-        return parseResults( searchResponse, new Query(  ) );
+        return parseResults(searchResponse, new Query());
     }
 
 
@@ -481,12 +475,12 @@ public class EsEntityIndexImpl implements EntityIndex {
      */
     public void deleteIndex() {
         AdminClient adminClient = esProvider.getClient().admin();
-        DeleteIndexResponse response = adminClient.indices().prepareDelete( aliasName ).get();
+        DeleteIndexResponse response = adminClient.indices().prepareDelete( indexIdentifier.getIndex(0) ).get();
         if ( response.isAcknowledged() ) {
-            logger.info( "Deleted index: " + aliasName );
+            logger.info( "Deleted index: " + alias);
         }
         else {
-            logger.info( "Failed to delete index " + aliasName );
+            logger.info( "Failed to delete index " + alias);
         }
     }
 
@@ -545,7 +539,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         try {
             ClusterHealthResponse chr = esProvider.getClient().admin().cluster()
-                                                  .health( new ClusterHealthRequest( new String[] { aliasName } ) )
+                                                  .health(new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(0)}))
                                                   .get();
             return Health.valueOf( chr.getStatus().name() );
         }