You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/12/10 01:31:49 UTC

[09/14] incubator-usergrid git commit: add caching to indexes

add caching to indexes


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b545c0e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b545c0e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b545c0e4

Branch: refs/heads/USERGRID-252
Commit: b545c0e4715d458f7e6b5c1cfa0e96a1f86068b2
Parents: 8d34426
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Dec 9 09:38:45 2014 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Dec 9 09:38:45 2014 -0700

----------------------------------------------------------------------
 .../index/impl/EsEntityIndexImpl.java           | 30 ++-----
 .../persistence/index/impl/EsIndexCache.java    | 91 ++++++++++++++++++++
 2 files changed, 99 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b545c0e4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3f895b5..a6e9337 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -30,7 +30,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
@@ -38,8 +37,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequestBuilder;
 import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.AliasMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -120,11 +117,11 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery();
 
+    private EsIndexCache aliasCache;
 
-    @Inject
-    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, 
-            final EsProvider provider ) {
 
+    @Inject
+    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final EsProvider provider, final EsIndexCache indexCache) {
         ValidationUtils.validateApplicationScope( appScope );
         this.applicationScope = appScope;
         this.esProvider = provider;
@@ -133,6 +130,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, appScope);
         this.alias = indexIdentifier.getAlias();
         this.failureMonitor = new FailureMonitorImpl( config, provider );
+        this.aliasCache = indexCache;
     }
 
     @Override
@@ -188,7 +186,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             String indexName = indexIdentifier.getIndex(indexSuffix);
             final AdminClient adminClient = esProvider.getClient().admin();
 
-            String[] indexNames = getIndexes(alias.getWriteAlias());
+            String[] indexNames = getIndexes(AliasType.Write);
 
             for(String currentIndex : indexNames){
                 isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,
@@ -206,7 +204,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             isAck = adminClient.indices().prepareAliases().addAlias(
                     indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
             logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck);
-
+            aliasCache.invalidate(alias);
         } catch (Exception e) {
             logger.warn("Failed to create alias ", e);
         }
@@ -214,21 +212,9 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     @Override
     public String[] getIndexes(final AliasType aliasType) {
-        final String aliasName = aliasType == AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias();
-        return getIndexes(aliasName);
+        return aliasCache.getIndexes(alias,aliasType);
     }
 
-    /**
-     * get indexes for alias
-     * @param aliasName
-     * @return
-     */
-    private String[] getIndexes(final String aliasName){
-        final AdminClient adminClient = esProvider.getClient().admin();
-        //remove write alias, can only have one
-        ImmutableOpenMap<String,List<AliasMetaData>> aliasMap = adminClient.indices().getAliases(new GetAliasesRequest(aliasName)).actionGet().getAliases();
-        return aliasMap.keys().toArray(String.class);
-    }
 
     /**
      * Tests writing a document to a new index to ensure it's working correctly. See this post:
@@ -457,7 +443,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             @Override
             public boolean doOp() {
                 try {
-                    String[] indexes = getIndexes(AliasType.Read);
+                    String[]  indexes = getIndexes(AliasType.Read);
                     Observable.from(indexes).subscribeOn(Schedulers.io()).map(new Func1<String, String>() {
                         @Override
                         public String call(String index) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b545c0e4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
new file mode 100644
index 0000000..710cc60
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
@@ -0,0 +1,91 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
+import org.apache.usergrid.persistence.index.IndexIdentifier;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Classy class class.
+ */
+@Singleton
+public class EsIndexCache {
+
+    private static final Logger logger = LoggerFactory.getLogger(EsEntityIndexImpl.class);
+
+    private LoadingCache<String, String[]> aliasIndexCache;
+
+    @Inject
+    public EsIndexCache(final EsProvider provider) {
+        aliasIndexCache = CacheBuilder.newBuilder().maximumSize(1000)
+                .refreshAfterWrite(5,TimeUnit.MINUTES)
+                .build(new CacheLoader<String, String[]>() {
+                    @Override
+                    public ListenableFuture<String[]> reload(String key, String[] oldValue) throws Exception {
+                        return super.reload(key, oldValue);
+                    }
+
+                    @Override
+                    public String[] load(String aliasName) {
+                        final AdminClient adminClient = provider.getClient().admin();
+                        //remove write alias, can only have one
+                        ImmutableOpenMap<String, List<AliasMetaData>> aliasMap = adminClient.indices().getAliases(new GetAliasesRequest(aliasName)).actionGet().getAliases();
+                        return aliasMap.keys().toArray(String.class);
+                    }
+                })
+
+        ;
+    }
+
+    public String[] getIndexes(IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType) {
+        String[] indexes;
+        try {
+            indexes = aliasIndexCache.get(aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias());
+        } catch (ExecutionException ee) {
+            logger.error("Failed to retreive indexes", ee);
+            throw new RuntimeException(ee);
+        }
+        return indexes;
+    }
+
+    public void invalidate(IndexIdentifier.IndexAlias alias){
+        aliasIndexCache.invalidate(alias.getWriteAlias());
+        aliasIndexCache.invalidate(alias.getReadAlias());
+
+    }
+
+}