You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/02 21:28:56 UTC
[1/6] incubator-usergrid git commit: Add batching to re-index
requests.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev 136135d0c -> 6f2d0bf54
Add batching to re-index requests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/446a8e8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/446a8e8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/446a8e8a
Branch: refs/heads/two-dot-o-dev
Commit: 446a8e8af99aca59934400caa64c3b021bcb460e
Parents: 03d0534
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Jul 1 17:23:34 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Jul 1 17:23:34 2015 -0700
----------------------------------------------------------------------
.../main/resources/usergrid-default.properties | 6 ++++-
.../asyncevents/AmazonAsyncEventService.java | 25 +++++++++++++++++++-
.../asyncevents/InMemoryAsyncEventService.java | 13 ++++++++--
.../index/IndexProcessorFig.java | 8 ++++++-
.../corepersistence/index/ReIndexAction.java | 11 +++++++--
.../index/ReIndexServiceImpl.java | 24 +++++++++----------
6 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d6ce318..b5b0e8c 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -259,6 +259,10 @@ elasticsearch.index_prefix=elasticsearch
#
#elasticsearch.buffer_size=1000
+# Set the maximum buffer size to use when performing re-index requests.
+#
+#elasticsearch.reindex.buffer_size=1000
+
# Set the batch size to use when sending batched index write requests to Elasticsearch.
#
#elasticsearch.batch_size=1000
@@ -322,7 +326,7 @@ usergrid.twodoto.appinfo.migration=true
# The number of worker threads used to read index write requests from the queue.
#
-#elasticsearch.worker_count=1
+#elasticsearch.worker_count=8
# Set the number of worker threads used for processing index write requests to
# Elasticsearch from the buffer.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b32d594..e652405 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,6 +29,7 @@ import com.codahale.metrics.Histogram;
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.CpEntityManager;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +144,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
}
+ private void offerBatch(final List operations){
+ final Timer.Context timer = this.writeTimer.time();
+
+ try {
+ //signal to SQS
+ this.queue.sendMessages(operations);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to queue message", e);
+ } finally {
+ timer.stop();
+ }
+ }
+
/**
* Take message from SQS
@@ -427,9 +441,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
}
- @Override
public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
//change to id scope to avoid serialization issues
offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id)));
}
+
+ public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+
+ List batch = new ArrayList<EdgeScope>();
+ for ( EdgeScope e : edges){
+ batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
+ }
+ //change to id scope to avoid serialization issues
+ offerBatch(batch);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index f035b43..b8e544d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -24,7 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -35,6 +35,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import rx.Observable;
+import java.util.List;
/**
@@ -95,13 +96,21 @@ public class InMemoryAsyncEventService implements AsyncEventService {
}
- @Override
public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
run(eventBuilder.index( entityIndexOperation ));
}
+ public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+ for ( EdgeScope e : edges){
+ final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
+ e.getEdge().getTargetNode(), updatedSince);
+
+ run(eventBuilder.index (entityIndexOperation));
+ }
+
+ }
public void run( Observable<?> observable ) {
//start it in the background on an i/o thread
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index c3942cc..34f8cb5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,6 +40,8 @@ public interface IndexProcessorFig extends GuicyFig {
String INDEX_QUEUE_VISIBILITY_TIMEOUT = "elasticsearch.queue_visibility_timeout";
+ String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size";
+
/**
* Set the amount of time to wait when Elasticsearch rejects a requests before
@@ -66,7 +68,7 @@ public interface IndexProcessorFig extends GuicyFig {
/**
* The number of worker threads used to read index write requests from the queue.
*/
- @Default( "1" )
+ @Default( "8" )
@Key( ELASTICSEARCH_WORKER_COUNT )
int getWorkerCount();
@@ -83,6 +85,10 @@ public interface IndexProcessorFig extends GuicyFig {
@Key("elasticsearch.reindex.flush.interval")
int getUpdateInterval();
+ @Default("1000")
+ @Key( REINDEX_BUFFER_SIZE )
+ int getReindexBufferSize();
+
/**
* Flag to resolve the LOCAL queue implementation service synchronously.
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 8a74540..5e201fb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,15 +20,15 @@
package org.apache.usergrid.corepersistence.index;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
+import java.util.List;
/**
* Callback to perform an index operation based on an scope during bulk re-index operations
*/
-@FunctionalInterface
public interface ReIndexAction {
/**
@@ -37,4 +37,11 @@ public interface ReIndexAction {
* @param id
*/
void index( final ApplicationScope applicationScope, final Id id, final long updatedSince );
+
+ /**
+ * Index a batch list of entities.
+ * @param edges
+ * @param updatedSince
+ */
+ void indexBatch ( final List<EdgeScope> edges, final long updatedSince);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 5ee545e..1353982 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -120,28 +120,26 @@ public class ReIndexServiceImpl implements ReIndexService {
final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
- //create an observable that loads each entity and indexes it, start it running with publish
- final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
- reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+ // create an observable that loads a batch to be indexed
- //for each edge, create our scope and index on it
- .doOnNext( edge -> {
+ final Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+ .buffer( indexProcessorFig.getReindexBufferSize())
+ .doOnNext(edges -> {
if(logger.isInfoEnabled()) {
- logger.info("Queueing {} {}", edge.getApplicationScope(), edge.getEdge().getTargetNode());
+ logger.info("Sending batch of {} to be indexed.", edges.size());
}
+ indexService.indexBatch(edges, modifiedSince);
- indexService.index(edge.getApplicationScope(),edge.getEdge().getTargetNode(), modifiedSince);
-
- } );
+ });
//start our sampler and state persistence
//take a sample every sample interval to allow us to resume state with minimal loss
- runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
- //create our flushing collector and flush the edge scopes to it
- .collect( () -> new FlushingCollector( jobId ),
- ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() )
+ //create our flushing collector and flush the edge scopes to it
+ runningReIndex.collect(() -> new FlushingCollector(jobId),
+ ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes))).doOnNext( flushingCollector-> flushingCollector.complete() )
//subscribe on our I/O scheduler and run the task
.subscribeOn( Schedulers.io() ).subscribe();
[4/6] incubator-usergrid git commit: Update binary bucket name.
Posted by sf...@apache.org.
Update binary bucket name.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/67cfa20d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/67cfa20d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/67cfa20d
Branch: refs/heads/two-dot-o-dev
Commit: 67cfa20d524ab2e45cded79e57d902d7e95c035a
Parents: d29756a
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Jul 2 09:40:35 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Jul 2 09:40:35 2015 -0700
----------------------------------------------------------------------
stack/config/src/main/resources/usergrid-default.properties | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/67cfa20d/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index b5b0e8c..80e5503 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -467,7 +467,7 @@ usergrid.central.read.timeout=10000
# Set the bucket name used for storing assets.
#
-usergrid.binary.bucketname=usergrid-test
+usergrid.binary.bucketname=usergrid-binaries
# Set the maximum size for a single asset (in MB).
#
[6/6] incubator-usergrid git commit: Merge branch 'pr/297' into
two-dot-o-dev
Posted by sf...@apache.org.
Merge branch 'pr/297' into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6f2d0bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6f2d0bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6f2d0bf5
Branch: refs/heads/two-dot-o-dev
Commit: 6f2d0bf54cc040066c491379f9589c1f81b97661
Parents: 136135d 2edf681
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Jul 2 13:18:57 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Jul 2 13:18:57 2015 -0600
----------------------------------------------------------------------
.../main/resources/usergrid-default.properties | 8 +++--
.../asyncevents/AmazonAsyncEventService.java | 25 ++++++++++++-
.../asyncevents/InMemoryAsyncEventService.java | 13 +++++--
.../index/IndexProcessorFig.java | 8 ++++-
.../corepersistence/index/ReIndexAction.java | 11 ++++--
.../index/ReIndexServiceImpl.java | 24 ++++++-------
.../queue/impl/SNSQueueManagerImpl.java | 37 +++++++++++++++-----
7 files changed, 96 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
[5/6] incubator-usergrid git commit: Add and update comments.
Posted by sf...@apache.org.
Add and update comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2edf6813
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2edf6813
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2edf6813
Branch: refs/heads/two-dot-o-dev
Commit: 2edf68131af1c2e47761e0d41341b2438bdc71a2
Parents: 67cfa20
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Jul 2 10:35:33 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Jul 2 10:35:33 2015 -0700
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 2 +-
.../usergrid/persistence/queue/impl/SNSQueueManagerImpl.java | 7 +++++++
2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2edf6813/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e652405..c5b836b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -450,9 +450,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
List batch = new ArrayList<EdgeScope>();
for ( EdgeScope e : edges){
+ //change to id scope to avoid serialization issues
batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
}
- //change to id scope to avoid serialization issues
offerBatch(batch);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2edf6813/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 6c6cae9..257f25e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -247,6 +247,13 @@ public class SNSQueueManagerImpl implements QueueManager {
private AmazonSNSAsyncClient createSNSClient(final Region region) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+ /**
+ * The Async client will use default client configurations (default max conn: 50)
+ * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html
+ * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/constant-values.html#com.amazonaws.ClientConfiguration.DEFAULT_MAX_CONNECTIONS
+ */
+
final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials());
sns.setRegion(region);
[3/6] incubator-usergrid git commit: Add async handler to
SNSAsyncClient so we can log successes in debug and, more importantly,
always log errors.
Posted by sf...@apache.org.
Add async handler to SNSAsyncClient so we can log successes in debug and, more importantly, always log errors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d29756a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d29756a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d29756a6
Branch: refs/heads/two-dot-o-dev
Commit: d29756a6f3e00f3b4e792fe1f15a4c5d7e757b35
Parents: ddeb700
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Jul 2 09:24:58 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Jul 2 09:24:58 2015 -0700
----------------------------------------------------------------------
.../persistence/queue/impl/SNSQueueManagerImpl.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d29756a6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 12e04ce..6c6cae9 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.queue.impl;
import com.amazonaws.AmazonServiceException;
+import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
@@ -365,9 +366,18 @@ public class SNSQueueManagerImpl implements QueueManager {
PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
- sns.publishAsync(publishRequest);
+ sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ @Override
+ public void onError(Exception e) {
+ logger.error("Error publishing message... {}", e);
+ }
+
+ @Override
+ public void onSuccess(PublishRequest request, PublishResult result) {
+ if (logger.isDebugEnabled()) logger.debug("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), request.getTopicArn());
- // see about implementing asyncHandler for publishAsync in future
+ }
+ });
}
[2/6] incubator-usergrid git commit: Convert Amazon SNS client to the
async SNS client.
Posted by sf...@apache.org.
Convert Amazon SNS client to the async SNS client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ddeb700b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ddeb700b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ddeb700b
Branch: refs/heads/two-dot-o-dev
Commit: ddeb700be52e06cf5a8eb722c6238f943317f3a3
Parents: 446a8e8
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Jul 1 17:23:59 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Jul 1 17:23:59 2015 -0700
----------------------------------------------------------------------
.../queue/impl/SNSQueueManagerImpl.java | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ddeb700b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 4812e55..12e04ce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.queue.impl;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.model.*;
import com.amazonaws.services.sns.util.Topics;
import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -54,7 +54,7 @@ public class SNSQueueManagerImpl implements QueueManager {
private final QueueFig fig;
private final ClusterFig clusterFig;
private final AmazonSQSClient sqs;
- private final AmazonSNSClient sns;
+ private final AmazonSNSAsyncClient sns;
private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -171,7 +171,7 @@ public class SNSQueueManagerImpl implements QueueManager {
Region region = Region.getRegion(regions);
final AmazonSQSClient sqsClient = createSQSClient(region);
- final AmazonSNSClient snsClient = createSNSClient(region);
+ final AmazonSNSAsyncClient snsClient = createSNSClient(region);
String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
@@ -210,7 +210,7 @@ public class SNSQueueManagerImpl implements QueueManager {
Regions snsRegions = Regions.fromName(strSnsRegion);
Region snsRegion = Region.getRegion(snsRegions);
- final AmazonSNSClient snsClient = createSNSClient(snsRegion);
+ final AmazonSNSAsyncClient snsClient = createSNSClient(snsRegion);
try {
@@ -244,9 +244,9 @@ public class SNSQueueManagerImpl implements QueueManager {
}
- private AmazonSNSClient createSNSClient(final Region region) {
+ private AmazonSNSAsyncClient createSNSClient(final Region region) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+ final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials());
sns.setRegion(region);
@@ -363,10 +363,12 @@ public class SNSQueueManagerImpl implements QueueManager {
if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
- PublishResult publishResult = sns.publish(topicArn, toString(body));
+ PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
+
+ sns.publishAsync(publishRequest);
+
+ // see about implementing asyncHandler for publishAsync in future
- if (logger.isDebugEnabled())
- logger.debug("Published Message ID: {} to arn: {}", publishResult.getMessageId(), topicArn);
}