You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:22 UTC

[22/50] incubator-usergrid git commit: Fixed naming convention issue

Fixed naming convention issue

Fixes blocking on cleanup (not necessary)

Fixes index creation when alias is cached


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c5943465
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c5943465
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c5943465

Branch: refs/heads/USERGRID-460
Commit: c594346558b9624c05f62096f8d83a0a91ce8af5
Parents: 9630fcf
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 17:02:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 17:02:58 2015 -0600

----------------------------------------------------------------------
 .../results/FilteringLoader.java                |   2 +-
 .../index/impl/BufferQueueSQSImpl.java          |  14 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |   4 +-
 .../index/impl/EsEntityIndexImpl.java           |  14 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  19 +--
 .../persistence/index/impl/EsIndexCache.java    | 138 +++++++++++--------
 .../queue/impl/SQSQueueManagerImpl.java         |  10 +-
 7 files changed, 124 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 1afd76b..2cd9fdc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -233,7 +233,7 @@ public class FilteringLoader implements ResultsLoader {
 
     @Override
     public void postProcess() {
-        this.indexBatch.execute().get();
+        this.indexBatch.execute();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 25c2ba6..3cace11 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -127,6 +127,12 @@ public class BufferQueueSQSImpl implements BufferQueue {
     @Override
     public void offer( final IndexOperationMessage operation ) {
 
+        //no op
+        if(operation.isEmpty()){
+            operation.getFuture().done();
+            return;
+        }
+
         final Timer.Context timer = this.writeTimer.time();
         this.writeMeter.mark();
 
@@ -141,7 +147,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
             //signal to SQS
             this.queue.sendMessage( identifier );
-            operation.getFuture().run();
+            operation.getFuture().done();
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -184,7 +190,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
             }
 
             //look up the values
-            final Map<String, String> values = mapManager.getStrings( mapEntries );
+            final Map<String, String> storedCommands = mapManager.getStrings( mapEntries );
 
 
             //load them into our response
@@ -193,7 +199,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
                 final String key = getMessageKey( message );
 
                 //now see if the key was there
-                final String payload = values.get( key );
+                final String payload = storedCommands.get( key );
 
                 //the entry was not present in cassandra, ignore this message.  Failure should eventually kick it to
                 // a DLQ
@@ -242,7 +248,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
             final SqsIndexOperationMessage sqsIndexOperationMessage =   ( SqsIndexOperationMessage ) ioe;
 
-            final String key = getMessageKey(sqsIndexOperationMessage.getMessage());
+            final String key = getMessageKey( sqsIndexOperationMessage.getMessage() );
 
             //remove it from the map
             mapManager.delete( key  );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index b63dfe6..8481dab 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -197,7 +197,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
          * No-op, just disregard it
          */
         if(tempContainer.isEmpty()){
-            return tempContainer.getFuture();
+            final BetterFuture<?> future =  tempContainer.getFuture();
+            future.done();
+            return future;
         }
 
         return indexBatchBufferProducer.put(tempContainer);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 99643da..fa50734 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -209,7 +209,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     public void initializeIndex() {
         final int numberOfShards = config.getNumberOfShards();
         final int numberOfReplicas = config.getNumberOfReplicas();
-        String[] indexes = getIndexes(AliasType.Write);
+        String[] indexes = getIndexesFromEs(AliasType.Write);
         if(indexes == null || indexes.length==0) {
             addIndex(null, numberOfShards, numberOfReplicas, config.getWriteConsistencyLevel());
         }
@@ -336,6 +336,18 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
 
     /**
+     * Get our index info from ES, but clear our cache first
+     * @param aliasType
+     * @return
+     */
+    public String[] getIndexesFromEs(final AliasType aliasType){
+        aliasCache.invalidate( alias );
+        return getIndexes( aliasType );
+    }
+
+
+
+    /**
      * Tests writing a document to a new index to ensure it's working correctly. See this post:
      * http://s.apache.org/index-missing-exception
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 2762c18..862b1ae 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -46,6 +46,7 @@ import rx.Subscription;
 import rx.functions.Action1;
 import rx.functions.Action2;
 import rx.functions.Func1;
+import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
 import java.util.List;
@@ -191,27 +192,27 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         }
 
         //process and flatten all the messages to builder requests
-        Observable<IndexOperationMessage> flattenMessages = Observable.from( operationMessages );
-
-
         //batch shard operations into a bulk request
-        flattenMessages.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
+        Observable.from( operationMessages ).flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
             @Override
             public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) {
                 final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() );
-                final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests() );
+                final Observable<DeIndexRequest> deIndex =
+                    Observable.from( indexOperationMessage.getDeIndexRequests() );
 
-                indexSizeCounter.inc(indexOperationMessage.getDeIndexRequests().size());
-                indexSizeCounter.inc(indexOperationMessage.getIndexRequests().size());
+                indexSizeCounter.dec( indexOperationMessage.getDeIndexRequests().size() );
+                indexSizeCounter.dec( indexOperationMessage.getIndexRequests().size() );
 
                 return Observable.merge( index, deIndex );
             }
         } )
       //collection all the operations into a single stream
-       .collect( initRequest(), new Action2<BulkRequestBuilder, BatchRequest>() {
+       .reduce( initRequest(), new Func2<BulkRequestBuilder, BatchRequest, BulkRequestBuilder>() {
            @Override
-           public void call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+           public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
                batchRequest.doOperation( client, bulkRequestBuilder );
+
+               return bulkRequestBuilder;
            }
        } )
         //send the request off to ES

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
index 0c07a34..ef518dd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
@@ -20,6 +20,24 @@
 
 package org.apache.usergrid.persistence.index.impl;
 
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexIdentifier;
+
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -29,21 +47,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.index.AliasedEntityIndex;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexIdentifier;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.AliasMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -52,65 +55,88 @@ import java.util.concurrent.TimeUnit;
 @Singleton
 public class EsIndexCache {
 
-    private static final Logger logger = LoggerFactory.getLogger(EsEntityIndexImpl.class);
+    private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
     private final ListeningScheduledExecutorService refreshExecutors;
 
     private LoadingCache<String, String[]> aliasIndexCache;
+    private EsProvider provider;
+
 
     @Inject
-    public EsIndexCache(final EsProvider provider, final IndexFig indexFig) {
-        
-        this.refreshExecutors = MoreExecutors
-                .listeningDecorator(Executors.newScheduledThreadPool(indexFig.getIndexCacheMaxWorkers()));
-        
-        aliasIndexCache = CacheBuilder.newBuilder().maximumSize(1000)
-                .refreshAfterWrite(5,TimeUnit.MINUTES)
-                .build(new CacheLoader<String, String[]>() {
-                    @Override
-                    public ListenableFuture<String[]> reload(final String key, String[] oldValue) throws Exception {
-                        ListenableFutureTask<String[]> task = ListenableFutureTask.create( new Callable<String[]>() {
-                            public String[] call() {
-                                return load( key );
-                            }
-                        } );
-                        refreshExecutors.execute(task);
-                        return task;
-                    }
-
-                    @Override
-                    public String[] load(final String aliasName) {
-                        final AdminClient adminClient = provider.getClient().admin();
-                        //remove write alias, can only have one
-                        ImmutableOpenMap<String, List<AliasMetaData>> aliasMap = 
-                           adminClient.indices().getAliases(new GetAliasesRequest(aliasName)).actionGet().getAliases();
-                        return aliasMap.keys().toArray(String.class);
-                    }
-                }) ;
+    public EsIndexCache( final EsProvider provider, final IndexFig indexFig ) {
+
+        this.refreshExecutors =
+            MoreExecutors.listeningDecorator( Executors.newScheduledThreadPool( indexFig.getIndexCacheMaxWorkers() ) );
+
+        this.provider = provider;
+
+        aliasIndexCache = CacheBuilder.newBuilder().maximumSize( 1000 ).refreshAfterWrite( 5, TimeUnit.MINUTES )
+                                      .build( new CacheLoader<String, String[]>() {
+                                          @Override
+                                          public ListenableFuture<String[]> reload( final String key,
+                                                                                    String[] oldValue )
+                                              throws Exception {
+                                              ListenableFutureTask<String[]> task =
+                                                  ListenableFutureTask.create( new Callable<String[]>() {
+                                                      public String[] call() {
+                                                          return load( key );
+                                                      }
+                                                  } );
+                                              refreshExecutors.execute( task );
+                                              return task;
+                                          }
+
+
+                                          @Override
+                                          public String[] load( final String aliasName ) {
+                                             return getIndexesFromEs(aliasName);
+                                          }
+                                      } );
     }
 
-    
+
     /**
      * Get indexes for an alias
      */
-    public String[] getIndexes(IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType) {
+    public String[] getIndexes( IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
         String[] indexes;
         try {
-            indexes = aliasIndexCache.get(aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias());
-        } catch (ExecutionException ee) {
-            logger.error("Failed to retreive indexes", ee);
-            throw new RuntimeException(ee);
+            indexes = aliasIndexCache.get( getAliasName( alias, aliasType ) );
+        }
+        catch ( ExecutionException ee ) {
+            logger.error( "Failed to retreive indexes", ee );
+            throw new RuntimeException( ee );
         }
         return indexes;
     }
 
-    
+
+
+    private String[] getIndexesFromEs(final String aliasName){
+        final AdminClient adminClient = this.provider.getClient().admin();
+             //remove write alias, can only have one
+        ImmutableOpenMap<String, List<AliasMetaData>> aliasMap =
+            adminClient.indices().getAliases( new GetAliasesRequest( aliasName ) ).actionGet().getAliases();
+        return aliasMap.keys().toArray( String.class );
+    }
+
+
     /**
-     * clean up cache
+     * Get the name of the alias to use
+     * @param alias
+     * @param aliasType
+     * @return
      */
-    public void invalidate(IndexIdentifier.IndexAlias alias){
-        aliasIndexCache.invalidate(alias.getWriteAlias());
-        aliasIndexCache.invalidate(alias.getReadAlias());
-
+    private String getAliasName( IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
+        return aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias();
     }
 
+
+    /**
+     * clean up cache
+     */
+    public void invalidate( IndexIdentifier.IndexAlias alias ) {
+        aliasIndexCache.invalidate( alias.getWriteAlias() );
+        aliasIndexCache.invalidate( alias.getReadAlias() );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c5943465/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index a78fc80..1fbd9b6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -33,7 +33,6 @@ import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.persistence.queue.QueueScope;
 
-import com.amazonaws.AbortedException;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -76,15 +75,16 @@ public class SQSQueueManagerImpl implements QueueManager {
             .maximumSize( 1000 )
             .build( new CacheLoader<String, Queue>() {
                 @Override
-                public Queue load( String queueLoader ) throws Exception {
+                public Queue load( String queueName ) throws Exception {
 
                     //the amazon client is not thread safe, we need to create one per queue
                     Queue queue = null;
                     try {
-                        GetQueueUrlResult result = sqs.getQueueUrl( queueLoader );
+                        GetQueueUrlResult result = sqs.getQueueUrl( queueName );
                         queue = new Queue( result.getQueueUrl() );
                     }catch ( QueueDoesNotExistException queueDoesNotExistException ) {
                         //no op, swallow
+                        LOG.error( "Queue {} does not exist, creating", queueName );
 
                     }
                     catch ( Exception e ) {
@@ -92,7 +92,7 @@ public class SQSQueueManagerImpl implements QueueManager {
                         throw e;
                     }
                     if ( queue == null ) {
-                        CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueLoader );
+                        CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueName );
                         CreateQueueResult result = sqs.createQueue( createQueueRequest );
                         String url = result.getQueueUrl();
                         queue = new Queue( url );
@@ -124,7 +124,7 @@ public class SQSQueueManagerImpl implements QueueManager {
 
 
     private String getName() {
-        String name = scope.getApplication().getType() + "_"+ scope.getName() + "_"+ scope.getApplication().getUuid().toString();
+        String name = fig.getPrefix() + "_" + scope.getApplication().getType() + "_"+ scope.getName() + "_"+ scope.getApplication().getUuid().toString();
         return name;
     }