You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/02 21:28:56 UTC

[1/6] incubator-usergrid git commit: Add batching to re-index requests.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 136135d0c -> 6f2d0bf54


Add batching to re-index requests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/446a8e8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/446a8e8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/446a8e8a

Branch: refs/heads/two-dot-o-dev
Commit: 446a8e8af99aca59934400caa64c3b021bcb460e
Parents: 03d0534
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Jul 1 17:23:34 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Jul 1 17:23:34 2015 -0700

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  6 ++++-
 .../asyncevents/AmazonAsyncEventService.java    | 25 +++++++++++++++++++-
 .../asyncevents/InMemoryAsyncEventService.java  | 13 ++++++++--
 .../index/IndexProcessorFig.java                |  8 ++++++-
 .../corepersistence/index/ReIndexAction.java    | 11 +++++++--
 .../index/ReIndexServiceImpl.java               | 24 +++++++++----------
 6 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d6ce318..b5b0e8c 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -259,6 +259,10 @@ elasticsearch.index_prefix=elasticsearch
 #
 #elasticsearch.buffer_size=1000
 
+# Set the maximum buffer size to use when performing re-index requests.
+#
+#elasticsearch.reindex.buffer_size=1000
+
 # Set the batch size to use when sending batched index write requests to Elasticsearch.
 #
 #elasticsearch.batch_size=1000
@@ -322,7 +326,7 @@ usergrid.twodoto.appinfo.migration=true
 
 # The number of worker threads used to read index write requests from the queue.
 #
-#elasticsearch.worker_count=1
+#elasticsearch.worker_count=8
 
 # Set the number of worker threads used for processing index write requests to
 # Elasticsearch from the buffer.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b32d594..e652405 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,6 +29,7 @@ import com.codahale.metrics.Histogram;
 import com.google.common.base.Preconditions;
 import org.apache.usergrid.corepersistence.CpEntityManager;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -143,6 +144,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
     }
 
+    private void offerBatch(final List operations){
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessages(operations);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        } finally {
+            timer.stop();
+        }
+    }
+
 
     /**
      * Take message from SQS
@@ -427,9 +441,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
     }
 
-    @Override
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
         //change to id scope to avoid serialization issues
         offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id)));
     }
+
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+
+        List batch = new ArrayList<EdgeScope>();
+        for ( EdgeScope e : edges){
+            batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
+        }
+        //change to id scope to avoid serialization issues
+        offerBatch(batch);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index f035b43..b8e544d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -24,7 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -35,6 +35,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
+import java.util.List;
 
 
 /**
@@ -95,13 +96,21 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     }
 
 
-    @Override
     public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
 
         run(eventBuilder.index( entityIndexOperation ));
     }
 
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+        for ( EdgeScope e : edges){
+            final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
+                e.getEdge().getTargetNode(), updatedSince);
+
+            run(eventBuilder.index (entityIndexOperation));
+        }
+
+    }
 
     public void run( Observable<?> observable ) {
         //start it in the background on an i/o thread

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index c3942cc..34f8cb5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,6 +40,8 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String INDEX_QUEUE_VISIBILITY_TIMEOUT = "elasticsearch.queue_visibility_timeout";
 
+    String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size";
+
 
     /**
      * Set the amount of time to wait when Elasticsearch rejects a requests before
@@ -66,7 +68,7 @@ public interface IndexProcessorFig extends GuicyFig {
     /**
      * The number of worker threads used to read index write requests from the queue.
      */
-    @Default( "1" )
+    @Default( "8" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 
@@ -83,6 +85,10 @@ public interface IndexProcessorFig extends GuicyFig {
     @Key("elasticsearch.reindex.flush.interval")
     int getUpdateInterval();
 
+    @Default("1000")
+    @Key( REINDEX_BUFFER_SIZE )
+    int getReindexBufferSize();
+
     /**
      * Flag to resolve the LOCAL queue implementation service synchronously.
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 8a74540..5e201fb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,15 +20,15 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
+import java.util.List;
 
 
 /**
  * Callback to perform an index operation based on an scope during bulk re-index operations
  */
-@FunctionalInterface
 public interface ReIndexAction {
 
     /**
@@ -37,4 +37,11 @@ public interface ReIndexAction {
      * @param id
      */
     void index( final ApplicationScope applicationScope, final Id id, final long updatedSince );
+
+    /**
+     * Index a batch list of entities.
+     * @param edges
+     * @param updatedSince
+     */
+    void indexBatch ( final List<EdgeScope> edges, final long updatedSince);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 5ee545e..1353982 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -120,28 +120,26 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
 
-        //create an observable that loads each entity and indexes it, start it running with publish
-        final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
-            reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+        // create an observable that loads a batch to be indexed
 
-            //for each edge, create our scope and index on it
-            .doOnNext( edge -> {
+        final Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+            reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+            .buffer( indexProcessorFig.getReindexBufferSize())
+            .doOnNext(edges -> {
 
                 if(logger.isInfoEnabled()) {
-                    logger.info("Queueing {} {}", edge.getApplicationScope(), edge.getEdge().getTargetNode());
+                    logger.info("Sending batch of {} to be indexed.", edges.size());
                 }
+                indexService.indexBatch(edges, modifiedSince);
 
-                indexService.index(edge.getApplicationScope(),edge.getEdge().getTargetNode(), modifiedSince);
-
-            } );
+            });
 
 
         //start our sampler and state persistence
         //take a sample every sample interval to allow us to resume state with minimal loss
-        runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
-            //create our flushing collector and flush the edge scopes to it
-            .collect( () -> new FlushingCollector( jobId ),
-                ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() )
+        //create our flushing collector and flush the edge scopes to it
+        runningReIndex.collect(() -> new FlushingCollector(jobId),
+            ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes))).doOnNext( flushingCollector-> flushingCollector.complete() )
                 //subscribe on our I/O scheduler and run the task
             .subscribeOn( Schedulers.io() ).subscribe();
 


[4/6] incubator-usergrid git commit: Update binary bucket name.

Posted by sf...@apache.org.
Update binary bucket name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/67cfa20d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/67cfa20d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/67cfa20d

Branch: refs/heads/two-dot-o-dev
Commit: 67cfa20d524ab2e45cded79e57d902d7e95c035a
Parents: d29756a
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Jul 2 09:40:35 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Jul 2 09:40:35 2015 -0700

----------------------------------------------------------------------
 stack/config/src/main/resources/usergrid-default.properties | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/67cfa20d/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index b5b0e8c..80e5503 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -467,7 +467,7 @@ usergrid.central.read.timeout=10000
 
 # Set the bucket name used for storing assets.
 #
-usergrid.binary.bucketname=usergrid-test
+usergrid.binary.bucketname=usergrid-binaries
 
 # Set the maximum size for a single asset (in MB).
 #


[6/6] incubator-usergrid git commit: Merge branch 'pr/297' into two-dot-o-dev

Posted by sf...@apache.org.
Merge branch 'pr/297' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6f2d0bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6f2d0bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6f2d0bf5

Branch: refs/heads/two-dot-o-dev
Commit: 6f2d0bf54cc040066c491379f9589c1f81b97661
Parents: 136135d 2edf681
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Jul 2 13:18:57 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Jul 2 13:18:57 2015 -0600

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  8 +++--
 .../asyncevents/AmazonAsyncEventService.java    | 25 ++++++++++++-
 .../asyncevents/InMemoryAsyncEventService.java  | 13 +++++--
 .../index/IndexProcessorFig.java                |  8 ++++-
 .../corepersistence/index/ReIndexAction.java    | 11 ++++--
 .../index/ReIndexServiceImpl.java               | 24 ++++++-------
 .../queue/impl/SNSQueueManagerImpl.java         | 37 +++++++++++++++-----
 7 files changed, 96 insertions(+), 30 deletions(-)
----------------------------------------------------------------------



[5/6] incubator-usergrid git commit: Add and update comments.

Posted by sf...@apache.org.
Add and update comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2edf6813
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2edf6813
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2edf6813

Branch: refs/heads/two-dot-o-dev
Commit: 2edf68131af1c2e47761e0d41341b2438bdc71a2
Parents: 67cfa20
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Jul 2 10:35:33 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Jul 2 10:35:33 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java  | 2 +-
 .../usergrid/persistence/queue/impl/SNSQueueManagerImpl.java  | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2edf6813/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e652405..c5b836b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -450,9 +450,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         List batch = new ArrayList<EdgeScope>();
         for ( EdgeScope e : edges){
+            //change to id scope to avoid serialization issues
             batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
         }
-        //change to id scope to avoid serialization issues
         offerBatch(batch);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2edf6813/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 6c6cae9..257f25e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -247,6 +247,13 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     private AmazonSNSAsyncClient createSNSClient(final Region region) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+        /**
+         * The Async client will use default client configurations (default max conn: 50)
+         * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html
+         * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/constant-values.html#com.amazonaws.ClientConfiguration.DEFAULT_MAX_CONNECTIONS
+         */
+
         final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials());
 
         sns.setRegion(region);


[3/6] incubator-usergrid git commit: Add async handler to SNSAsyncClient so we can log successes in debug and, more importantly, always log errors.

Posted by sf...@apache.org.
Add async handler to SNSAsyncClient so we can log successes in debug and, more importantly, always log errors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d29756a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d29756a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d29756a6

Branch: refs/heads/two-dot-o-dev
Commit: d29756a6f3e00f3b4e792fe1f15a4c5d7e757b35
Parents: ddeb700
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Jul 2 09:24:58 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Jul 2 09:24:58 2015 -0700

----------------------------------------------------------------------
 .../persistence/queue/impl/SNSQueueManagerImpl.java   | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d29756a6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 12e04ce..6c6cae9 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.queue.impl;
 
 
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.handlers.AsyncHandler;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sns.AmazonSNSAsyncClient;
@@ -365,9 +366,18 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
 
-        sns.publishAsync(publishRequest);
+        sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+                @Override
+                public void onError(Exception e) {
+                    logger.error("Error publishing message... {}", e);
+                }
+
+                @Override
+                public void onSuccess(PublishRequest request, PublishResult result) {
+                    if (logger.isDebugEnabled()) logger.debug("Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(), request.getTopicArn());
 
-        // see about implementing asyncHandler for publishAsync in future
+                }
+            });
 
     }
 


[2/6] incubator-usergrid git commit: Convert Amazon SNS client to the async SNS client.

Posted by sf...@apache.org.
Convert Amazon SNS client to the async SNS client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ddeb700b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ddeb700b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ddeb700b

Branch: refs/heads/two-dot-o-dev
Commit: ddeb700be52e06cf5a8eb722c6238f943317f3a3
Parents: 446a8e8
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Jul 1 17:23:59 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Jul 1 17:23:59 2015 -0700

----------------------------------------------------------------------
 .../queue/impl/SNSQueueManagerImpl.java         | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ddeb700b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 4812e55..12e04ce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.queue.impl;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.AmazonSNSAsyncClient;
 import com.amazonaws.services.sns.model.*;
 import com.amazonaws.services.sns.util.Topics;
 import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -54,7 +54,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final QueueFig fig;
     private final ClusterFig clusterFig;
     private final AmazonSQSClient sqs;
-    private final AmazonSNSClient sns;
+    private final AmazonSNSAsyncClient sns;
 
 
     private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -171,7 +171,7 @@ public class SNSQueueManagerImpl implements QueueManager {
                 Region region = Region.getRegion(regions);
 
                 final AmazonSQSClient sqsClient = createSQSClient(region);
-                final AmazonSNSClient snsClient = createSNSClient(region);
+                final AmazonSNSAsyncClient snsClient = createSNSClient(region);
 
                 String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
 
@@ -210,7 +210,7 @@ public class SNSQueueManagerImpl implements QueueManager {
                     Regions snsRegions = Regions.fromName(strSnsRegion);
                     Region snsRegion = Region.getRegion(snsRegions);
 
-                    final AmazonSNSClient snsClient = createSNSClient(snsRegion);
+                    final AmazonSNSAsyncClient snsClient = createSNSClient(snsRegion);
 
                     try {
 
@@ -244,9 +244,9 @@ public class SNSQueueManagerImpl implements QueueManager {
     }
 
 
-    private AmazonSNSClient createSNSClient(final Region region) {
+    private AmazonSNSAsyncClient createSNSClient(final Region region) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials());
 
         sns.setRegion(region);
 
@@ -363,10 +363,12 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
 
-        PublishResult publishResult = sns.publish(topicArn, toString(body));
+        PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
+
+        sns.publishAsync(publishRequest);
+
+        // see about implementing asyncHandler for publishAsync in future
 
-        if (logger.isDebugEnabled())
-            logger.debug("Published Message ID: {} to arn: {}", publishResult.getMessageId(), topicArn);
     }