You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:56 UTC
[28/50] incubator-usergrid git commit: Fixed naming convention issue
Fixed naming convention issue
Fixes blocking on cleanup (not necessary)
Fixes index creation when alias is cached
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c5943465
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c5943465
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c5943465
Branch: refs/heads/two-dot-o
Commit: c594346558b9624c05f62096f8d83a0a91ce8af5
Parents: 9630fcf
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 17:02:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 17:02:58 2015 -0600
----------------------------------------------------------------------
.../results/FilteringLoader.java | 2 +-
.../index/impl/BufferQueueSQSImpl.java | 14 +-
.../index/impl/EsEntityIndexBatchImpl.java | 4 +-
.../index/impl/EsEntityIndexImpl.java | 14 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 19 +--
.../persistence/index/impl/EsIndexCache.java | 138 +++++++++++--------
.../queue/impl/SQSQueueManagerImpl.java | 10 +-
7 files changed, 124 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 1afd76b..2cd9fdc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -233,7 +233,7 @@ public class FilteringLoader implements ResultsLoader {
@Override
public void postProcess() {
- this.indexBatch.execute().get();
+ this.indexBatch.execute();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 25c2ba6..3cace11 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -127,6 +127,12 @@ public class BufferQueueSQSImpl implements BufferQueue {
@Override
public void offer( final IndexOperationMessage operation ) {
+ //no op
+ if(operation.isEmpty()){
+ operation.getFuture().done();
+ return;
+ }
+
final Timer.Context timer = this.writeTimer.time();
this.writeMeter.mark();
@@ -141,7 +147,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
//signal to SQS
this.queue.sendMessage( identifier );
- operation.getFuture().run();
+ operation.getFuture().done();
}
catch ( IOException e ) {
throw new RuntimeException( "Unable to queue message", e );
@@ -184,7 +190,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
}
//look up the values
- final Map<String, String> values = mapManager.getStrings( mapEntries );
+ final Map<String, String> storedCommands = mapManager.getStrings( mapEntries );
//load them into our response
@@ -193,7 +199,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
final String key = getMessageKey( message );
//now see if the key was there
- final String payload = values.get( key );
+ final String payload = storedCommands.get( key );
//the entry was not present in cassandra, ignore this message. Failure should eventually kick it to
// a DLQ
@@ -242,7 +248,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe;
- final String key = getMessageKey(sqsIndexOperationMessage.getMessage());
+ final String key = getMessageKey( sqsIndexOperationMessage.getMessage() );
//remove it from the map
mapManager.delete( key );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index b63dfe6..8481dab 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -197,7 +197,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
* No-op, just disregard it
*/
if(tempContainer.isEmpty()){
- return tempContainer.getFuture();
+ final BetterFuture<?> future = tempContainer.getFuture();
+ future.done();
+ return future;
}
return indexBatchBufferProducer.put(tempContainer);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 99643da..fa50734 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -209,7 +209,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
public void initializeIndex() {
final int numberOfShards = config.getNumberOfShards();
final int numberOfReplicas = config.getNumberOfReplicas();
- String[] indexes = getIndexes(AliasType.Write);
+ String[] indexes = getIndexesFromEs(AliasType.Write);
if(indexes == null || indexes.length==0) {
addIndex(null, numberOfShards, numberOfReplicas, config.getWriteConsistencyLevel());
}
@@ -336,6 +336,18 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
/**
+ * Get our index info from ES, but clear our cache first
+ * @param aliasType
+ * @return
+ */
+ public String[] getIndexesFromEs(final AliasType aliasType){
+ aliasCache.invalidate( alias );
+ return getIndexes( aliasType );
+ }
+
+
+
+ /**
* Tests writing a document to a new index to ensure it's working correctly. See this post:
* http://s.apache.org/index-missing-exception
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 2762c18..862b1ae 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -46,6 +46,7 @@ import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
+import rx.functions.Func2;
import rx.schedulers.Schedulers;
import java.util.List;
@@ -191,27 +192,27 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
//process and flatten all the messages to builder requests
- Observable<IndexOperationMessage> flattenMessages = Observable.from( operationMessages );
-
-
//batch shard operations into a bulk request
- flattenMessages.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
+ Observable.from( operationMessages ).flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
@Override
public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) {
final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() );
- final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests() );
+ final Observable<DeIndexRequest> deIndex =
+ Observable.from( indexOperationMessage.getDeIndexRequests() );
- indexSizeCounter.inc(indexOperationMessage.getDeIndexRequests().size());
- indexSizeCounter.inc(indexOperationMessage.getIndexRequests().size());
+ indexSizeCounter.dec( indexOperationMessage.getDeIndexRequests().size() );
+ indexSizeCounter.dec( indexOperationMessage.getIndexRequests().size() );
return Observable.merge( index, deIndex );
}
} )
//collection all the operations into a single stream
- .collect( initRequest(), new Action2<BulkRequestBuilder, BatchRequest>() {
+ .reduce( initRequest(), new Func2<BulkRequestBuilder, BatchRequest, BulkRequestBuilder>() {
@Override
- public void call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+ public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
batchRequest.doOperation( client, bulkRequestBuilder );
+
+ return bulkRequestBuilder;
}
} )
//send the request off to ES
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
index 0c07a34..ef518dd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
@@ -20,6 +20,24 @@
package org.apache.usergrid.persistence.index.impl;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexIdentifier;
+
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -29,21 +47,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.index.AliasedEntityIndex;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexIdentifier;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.AliasMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
/**
@@ -52,65 +55,88 @@ import java.util.concurrent.TimeUnit;
@Singleton
public class EsIndexCache {
- private static final Logger logger = LoggerFactory.getLogger(EsEntityIndexImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
private final ListeningScheduledExecutorService refreshExecutors;
private LoadingCache<String, String[]> aliasIndexCache;
+ private EsProvider provider;
+
@Inject
- public EsIndexCache(final EsProvider provider, final IndexFig indexFig) {
-
- this.refreshExecutors = MoreExecutors
- .listeningDecorator(Executors.newScheduledThreadPool(indexFig.getIndexCacheMaxWorkers()));
-
- aliasIndexCache = CacheBuilder.newBuilder().maximumSize(1000)
- .refreshAfterWrite(5,TimeUnit.MINUTES)
- .build(new CacheLoader<String, String[]>() {
- @Override
- public ListenableFuture<String[]> reload(final String key, String[] oldValue) throws Exception {
- ListenableFutureTask<String[]> task = ListenableFutureTask.create( new Callable<String[]>() {
- public String[] call() {
- return load( key );
- }
- } );
- refreshExecutors.execute(task);
- return task;
- }
-
- @Override
- public String[] load(final String aliasName) {
- final AdminClient adminClient = provider.getClient().admin();
- //remove write alias, can only have one
- ImmutableOpenMap<String, List<AliasMetaData>> aliasMap =
- adminClient.indices().getAliases(new GetAliasesRequest(aliasName)).actionGet().getAliases();
- return aliasMap.keys().toArray(String.class);
- }
- }) ;
+ public EsIndexCache( final EsProvider provider, final IndexFig indexFig ) {
+
+ this.refreshExecutors =
+ MoreExecutors.listeningDecorator( Executors.newScheduledThreadPool( indexFig.getIndexCacheMaxWorkers() ) );
+
+ this.provider = provider;
+
+ aliasIndexCache = CacheBuilder.newBuilder().maximumSize( 1000 ).refreshAfterWrite( 5, TimeUnit.MINUTES )
+ .build( new CacheLoader<String, String[]>() {
+ @Override
+ public ListenableFuture<String[]> reload( final String key,
+ String[] oldValue )
+ throws Exception {
+ ListenableFutureTask<String[]> task =
+ ListenableFutureTask.create( new Callable<String[]>() {
+ public String[] call() {
+ return load( key );
+ }
+ } );
+ refreshExecutors.execute( task );
+ return task;
+ }
+
+
+ @Override
+ public String[] load( final String aliasName ) {
+ return getIndexesFromEs(aliasName);
+ }
+ } );
}
-
+
/**
* Get indexes for an alias
*/
- public String[] getIndexes(IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType) {
+ public String[] getIndexes( IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
String[] indexes;
try {
- indexes = aliasIndexCache.get(aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias());
- } catch (ExecutionException ee) {
- logger.error("Failed to retreive indexes", ee);
- throw new RuntimeException(ee);
+ indexes = aliasIndexCache.get( getAliasName( alias, aliasType ) );
+ }
+ catch ( ExecutionException ee ) {
+ logger.error( "Failed to retreive indexes", ee );
+ throw new RuntimeException( ee );
}
return indexes;
}
-
+
+
+ private String[] getIndexesFromEs(final String aliasName){
+ final AdminClient adminClient = this.provider.getClient().admin();
+ //remove write alias, can only have one
+ ImmutableOpenMap<String, List<AliasMetaData>> aliasMap =
+ adminClient.indices().getAliases( new GetAliasesRequest( aliasName ) ).actionGet().getAliases();
+ return aliasMap.keys().toArray( String.class );
+ }
+
+
/**
- * clean up cache
+ * Get the name of the alias to use
+ * @param alias
+ * @param aliasType
+ * @return
*/
- public void invalidate(IndexIdentifier.IndexAlias alias){
- aliasIndexCache.invalidate(alias.getWriteAlias());
- aliasIndexCache.invalidate(alias.getReadAlias());
-
+ private String getAliasName( IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
+ return aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias();
}
+
+ /**
+ * clean up cache
+ */
+ public void invalidate( IndexIdentifier.IndexAlias alias ) {
+ aliasIndexCache.invalidate( alias.getWriteAlias() );
+ aliasIndexCache.invalidate( alias.getReadAlias() );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index a78fc80..1fbd9b6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -33,7 +33,6 @@ import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueMessage;
import org.apache.usergrid.persistence.queue.QueueScope;
-import com.amazonaws.AbortedException;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -76,15 +75,16 @@ public class SQSQueueManagerImpl implements QueueManager {
.maximumSize( 1000 )
.build( new CacheLoader<String, Queue>() {
@Override
- public Queue load( String queueLoader ) throws Exception {
+ public Queue load( String queueName ) throws Exception {
//the amazon client is not thread safe, we need to create one per queue
Queue queue = null;
try {
- GetQueueUrlResult result = sqs.getQueueUrl( queueLoader );
+ GetQueueUrlResult result = sqs.getQueueUrl( queueName );
queue = new Queue( result.getQueueUrl() );
}catch ( QueueDoesNotExistException queueDoesNotExistException ) {
//no op, swallow
+ LOG.error( "Queue {} does not exist, creating", queueName );
}
catch ( Exception e ) {
@@ -92,7 +92,7 @@ public class SQSQueueManagerImpl implements QueueManager {
throw e;
}
if ( queue == null ) {
- CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueLoader );
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueName );
CreateQueueResult result = sqs.createQueue( createQueueRequest );
String url = result.getQueueUrl();
queue = new Queue( url );
@@ -124,7 +124,7 @@ public class SQSQueueManagerImpl implements QueueManager {
private String getName() {
- String name = scope.getApplication().getType() + "_"+ scope.getName() + "_"+ scope.getApplication().getUuid().toString();
+ String name = fig.getPrefix() + "_" + scope.getApplication().getType() + "_"+ scope.getName() + "_"+ scope.getApplication().getUuid().toString();
return name;
}