You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/12/10 01:17:45 UTC
[09/13] incubator-usergrid git commit: add caching to indexes
add caching to indexes
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b545c0e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b545c0e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b545c0e4
Branch: refs/heads/two-dot-o
Commit: b545c0e4715d458f7e6b5c1cfa0e96a1f86068b2
Parents: 8d34426
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Dec 9 09:38:45 2014 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Dec 9 09:38:45 2014 -0700
----------------------------------------------------------------------
.../index/impl/EsEntityIndexImpl.java | 30 ++-----
.../persistence/index/impl/EsIndexCache.java | 91 ++++++++++++++++++++
2 files changed, 99 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b545c0e4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3f895b5..a6e9337 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -30,7 +30,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
@@ -38,8 +37,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.AliasMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -120,11 +117,11 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery();
+ private EsIndexCache aliasCache;
- @Inject
- public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
- final EsProvider provider ) {
+ @Inject
+ public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final EsProvider provider, final EsIndexCache indexCache) {
ValidationUtils.validateApplicationScope( appScope );
this.applicationScope = appScope;
this.esProvider = provider;
@@ -133,6 +130,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, appScope);
this.alias = indexIdentifier.getAlias();
this.failureMonitor = new FailureMonitorImpl( config, provider );
+ this.aliasCache = indexCache;
}
@Override
@@ -188,7 +186,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
String indexName = indexIdentifier.getIndex(indexSuffix);
final AdminClient adminClient = esProvider.getClient().admin();
- String[] indexNames = getIndexes(alias.getWriteAlias());
+ String[] indexNames = getIndexes(AliasType.Write);
for(String currentIndex : indexNames){
isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,
@@ -206,7 +204,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
isAck = adminClient.indices().prepareAliases().addAlias(
indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck);
-
+ aliasCache.invalidate(alias);
} catch (Exception e) {
logger.warn("Failed to create alias ", e);
}
@@ -214,21 +212,9 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public String[] getIndexes(final AliasType aliasType) {
- final String aliasName = aliasType == AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias();
- return getIndexes(aliasName);
+ return aliasCache.getIndexes(alias,aliasType);
}
- /**
- * get indexes for alias
- * @param aliasName
- * @return
- */
- private String[] getIndexes(final String aliasName){
- final AdminClient adminClient = esProvider.getClient().admin();
- //remove write alias, can only have one
- ImmutableOpenMap<String,List<AliasMetaData>> aliasMap = adminClient.indices().getAliases(new GetAliasesRequest(aliasName)).actionGet().getAliases();
- return aliasMap.keys().toArray(String.class);
- }
/**
* Tests writing a document to a new index to ensure it's working correctly. See this post:
@@ -457,7 +443,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public boolean doOp() {
try {
- String[] indexes = getIndexes(AliasType.Read);
+ String[] indexes = getIndexes(AliasType.Read);
Observable.from(indexes).subscribeOn(Schedulers.io()).map(new Func1<String, String>() {
@Override
public String call(String index) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b545c0e4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
new file mode 100644
index 0000000..710cc60
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
+import org.apache.usergrid.persistence.index.IndexIdentifier;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Classy class class.
+ */
+@Singleton
+public class EsIndexCache {
+
+ private static final Logger logger = LoggerFactory.getLogger(EsEntityIndexImpl.class);
+
+ private LoadingCache<String, String[]> aliasIndexCache;
+
+ @Inject
+ public EsIndexCache(final EsProvider provider) {
+ aliasIndexCache = CacheBuilder.newBuilder().maximumSize(1000)
+ .refreshAfterWrite(5,TimeUnit.MINUTES)
+ .build(new CacheLoader<String, String[]>() {
+ @Override
+ public ListenableFuture<String[]> reload(String key, String[] oldValue) throws Exception {
+ return super.reload(key, oldValue);
+ }
+
+ @Override
+ public String[] load(String aliasName) {
+ final AdminClient adminClient = provider.getClient().admin();
+ //remove write alias, can only have one
+ ImmutableOpenMap<String, List<AliasMetaData>> aliasMap = adminClient.indices().getAliases(new GetAliasesRequest(aliasName)).actionGet().getAliases();
+ return aliasMap.keys().toArray(String.class);
+ }
+ })
+
+ ;
+ }
+
+ public String[] getIndexes(IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType) {
+ String[] indexes;
+ try {
+ indexes = aliasIndexCache.get(aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias());
+ } catch (ExecutionException ee) {
+ logger.error("Failed to retreive indexes", ee);
+ throw new RuntimeException(ee);
+ }
+ return indexes;
+ }
+
+ public void invalidate(IndexIdentifier.IndexAlias alias){
+ aliasIndexCache.invalidate(alias.getWriteAlias());
+ aliasIndexCache.invalidate(alias.getReadAlias());
+
+ }
+
+}