You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:35 UTC
[06/25] usergrid git commit: Move under
org.apache.usergrid.persistence package in preparation for integration into
Usergrid.
Move under org.apache.usergrid.persistence package in preparation for integration into Usergrid.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9016fd29
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9016fd29
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9016fd29
Branch: refs/heads/usergrid-1318-queue
Commit: 9016fd290e80b88ba3199c0020fca7722b82d780
Parents: 1574b3a
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 9 17:31:44 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 9 17:31:44 2016 -0400
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 58 +++----
.../asyncevents/AsyncIndexProvider.java | 12 +-
.../index/AsyncEventServiceImplTest.java | 8 +-
.../usergrid/mq/LegacyQueuePathsTest.java | 48 ++++++
.../org/apache/usergrid/mq/QueuePathsTest.java | 48 ------
.../usergrid/persistence/queue/LegacyQueue.java | 35 +++++
.../persistence/queue/LegacyQueueFig.java | 106 +++++++++++++
.../persistence/queue/LegacyQueueManager.java | 79 ++++++++++
.../queue/LegacyQueueManagerFactory.java | 23 +++
.../LegacyQueueManagerInternalFactory.java | 28 ++++
.../persistence/queue/LegacyQueueMessage.java | 70 +++++++++
.../persistence/queue/LegacyQueueScope.java | 45 ++++++
.../persistence/queue/LocalQueueManager.java | 18 +--
.../usergrid/persistence/queue/Queue.java | 35 -----
.../usergrid/persistence/queue/QueueFig.java | 106 -------------
.../persistence/queue/QueueManager.java | 79 ----------
.../persistence/queue/QueueManagerFactory.java | 23 ---
.../queue/QueueManagerInternalFactory.java | 28 ----
.../persistence/queue/QueueMessage.java | 70 ---------
.../usergrid/persistence/queue/QueueScope.java | 45 ------
.../persistence/queue/guice/QueueModule.java | 16 +-
.../queue/impl/LegacyQueueScopeImpl.java | 67 ++++++++
.../queue/impl/QueueManagerFactoryImpl.java | 20 +--
.../persistence/queue/impl/QueueScopeImpl.java | 68 --------
.../queue/impl/SNSQueueManagerImpl.java | 48 +++---
.../queue/util/AmazonNotificationUtils.java | 4 +-
.../queue/LegacyQueueManagerTest.java | 156 +++++++++++++++++++
.../persistence/queue/QueueManagerTest.java | 156 -------------------
.../notifications/ApplicationQueueManager.java | 4 +-
.../ApplicationQueueManagerCache.java | 6 +-
.../notifications/NotificationsService.java | 19 ++-
.../services/notifications/QueueJob.java | 1 -
.../services/notifications/QueueListener.java | 29 ++--
.../impl/ApplicationQueueManagerImpl.java | 18 +--
.../services/queues/ImportQueueListener.java | 8 +-
.../services/queues/ImportQueueManager.java | 12 +-
.../usergrid/services/queues/QueueListener.java | 18 +--
37 files changed, 803 insertions(+), 811 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 0bff887..6add88c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -57,12 +57,12 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
@@ -104,9 +104,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
- private final QueueManager queue;
+ private final LegacyQueueManager queue;
private final IndexProcessorFig indexProcessorFig;
- private final QueueFig queueFig;
+ private final LegacyQueueFig queueFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -134,7 +134,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
@Inject
- public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory,
+ public AsyncEventServiceImpl(final LegacyQueueManagerFactory queueManagerFactory,
final IndexProcessorFig indexProcessorFig,
final IndexProducer indexProducer,
final MetricsFactory metricsFactory,
@@ -143,7 +143,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final EntityIndexFactory entityIndexFactory,
final EventBuilder eventBuilder,
final MapManagerFactory mapManagerFactory,
- final QueueFig queueFig,
+ final LegacyQueueFig queueFig,
@EventExecutionScheduler
final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
@@ -159,7 +159,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
this.rxTaskScheduler = rxTaskScheduler;
- QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+ LegacyQueueScope queueScope = new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
this.queue = queueManagerFactory.getQueueManager(queueScope);
this.indexProcessorFig = indexProcessorFig;
@@ -233,7 +233,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
/**
* Take message from SQS
*/
- private List<QueueMessage> take() {
+ private List<LegacyQueueMessage> take() {
final Timer.Context timer = this.readTimer.time();
@@ -251,7 +251,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
/**
* Ack message in SQS
*/
- public void ack(final List<QueueMessage> messages) {
+ public void ack(final List<LegacyQueueMessage> messages) {
final Timer.Context timer = this.ackTimer.time();
@@ -275,7 +275,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
* @param messages
* @return
*/
- private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
+ private List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
if (logger.isDebugEnabled()) {
logger.debug("callEventHandlers with {} message(s)", messages.size());
@@ -422,7 +422,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private IndexOperationMessage handleEntityIndexUpdate(final QueueMessage message) {
+ private IndexOperationMessage handleEntityIndexUpdate(final LegacyQueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
@@ -457,7 +457,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private IndexOperationMessage handleEdgeIndex(final QueueMessage message) {
+ private IndexOperationMessage handleEdgeIndex(final LegacyQueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
@@ -486,7 +486,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
}
- private IndexOperationMessage handleEdgeDelete(final QueueMessage message) {
+ private IndexOperationMessage handleEdgeDelete(final LegacyQueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
@@ -668,7 +668,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
}
- private IndexOperationMessage handleEntityDelete(final QueueMessage message) {
+ private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
@@ -700,7 +700,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
+ private void handleInitializeApplicationIndex(final AsyncEvent event, final LegacyQueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
@@ -741,15 +741,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
private void startWorker() {
synchronized (mutex) {
- Observable<List<QueueMessage>> consumer =
- Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
+ Observable<List<LegacyQueueMessage>> consumer =
+ Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@Override
- public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
+ public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
- List<QueueMessage> drainList = null;
+ List<LegacyQueueMessage> drainList = null;
do {
try {
@@ -799,7 +799,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
List<IndexEventResult> indexEventResults = callEventHandlers( messages );
// submit the processed messages to index producer
- List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
+ List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults );
if ( messagesToAck.size() < messages.size() ) {
logger.warn( "Missing {} message(s) from index processing",
@@ -834,7 +834,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
* Submit results to index and return the queue messages to be ack'd
*
*/
- private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
+ private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
// if nothing came back then return empty list
if(indexEventResults==null){
@@ -842,7 +842,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
IndexOperationMessage combined = new IndexOperationMessage();
- List<QueueMessage> queueMessages = indexEventResults.stream()
+ List<LegacyQueueMessage> queueMessages = indexEventResults.stream()
// filter out messages that are not present, they were not processed and put into the results
.filter( result -> result.getQueueMessage().isPresent() )
@@ -898,10 +898,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public class IndexEventResult{
private final Optional<IndexOperationMessage> indexOperationMessage;
- private final Optional<QueueMessage> queueMessage;
+ private final Optional<LegacyQueueMessage> queueMessage;
private final long creationTime;
- public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){
+ public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<LegacyQueueMessage> queueMessage, long creationTime){
this.queueMessage = queueMessage;
this.creationTime = creationTime;
@@ -912,7 +912,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
return indexOperationMessage;
}
- public Optional<QueueMessage> getQueueMessage() {
+ public Optional<LegacyQueueMessage> getQueueMessage() {
return queueMessage;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index abd4ce1..aac0e66 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -29,8 +29,8 @@ import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.queue.LocalQueueManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
@@ -45,7 +45,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final IndexProcessorFig indexProcessorFig;
- private final QueueManagerFactory queueManagerFactory;
+ private final LegacyQueueManagerFactory queueManagerFactory;
private final MetricsFactory metricsFactory;
private final RxTaskScheduler rxTaskScheduler;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -54,14 +54,14 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final EntityIndexFactory entityIndexFactory;
private final IndexProducer indexProducer;
private final MapManagerFactory mapManagerFactory;
- private final QueueFig queueFig;
+ private final LegacyQueueFig queueFig;
private AsyncEventService asyncEventService;
@Inject
public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
- final QueueManagerFactory queueManagerFactory,
+ final LegacyQueueManagerFactory queueManagerFactory,
final MetricsFactory metricsFactory,
@EventExecutionScheduler final RxTaskScheduler rxTaskScheduler,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
@@ -70,7 +70,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
final EntityIndexFactory entityIndexFactory,
final IndexProducer indexProducer,
final MapManagerFactory mapManagerFactory,
- final QueueFig queueFig) {
+ final LegacyQueueFig queueFig) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
index 4c0058b..92b5983 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
@@ -34,8 +34,8 @@ import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.EsRunner;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
import org.junit.Rule;
import org.junit.runner.RunWith;
@@ -53,13 +53,13 @@ public class AsyncEventServiceImplTest extends AsyncIndexServiceTest {
@Inject
- public QueueManagerFactory queueManagerFactory;
+ public LegacyQueueManagerFactory queueManagerFactory;
@Inject
public IndexProcessorFig indexProcessorFig;
@Inject
- public QueueFig queueFig;
+ public LegacyQueueFig queueFig;
@Inject
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java b/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java
new file mode 100644
index 0000000..5ffa553
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.mq;
+
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.usergrid.mq.Queue.getQueueParentPaths;
+import static org.apache.usergrid.mq.Queue.normalizeQueuePath;
+import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
+
+
+
+public class LegacyQueuePathsTest {
+ private static final Logger logger = LoggerFactory.getLogger( LegacyQueuePathsTest.class );
+
+
+ @Test
+ // TODO - why does this test case not have assertions to test results?
+ // tests should not be written like this: what's the point? If it's
+ // code coverage this is still bad.
+ public void testPaths() throws Exception {
+ logger.info( normalizeQueuePath( "a/b/c" ) );
+ logger.info( normalizeQueuePath( "a/b/c/" ) );
+ logger.info( normalizeQueuePath( "/a/b/c" ) );
+ logger.info( normalizeQueuePath( "/////a/b/c" ) );
+ logger.info( normalizeQueuePath( "/" ) );
+
+ logger.info( mapToFormattedJsonString( getQueueParentPaths( "/a/b/c" ) ) );
+ logger.info( mapToFormattedJsonString( getQueueParentPaths( "/" ) ) );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java b/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java
deleted file mode 100644
index 86dc8bc..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.mq;
-
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.usergrid.mq.Queue.getQueueParentPaths;
-import static org.apache.usergrid.mq.Queue.normalizeQueuePath;
-import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
-
-
-
-public class QueuePathsTest {
- private static final Logger logger = LoggerFactory.getLogger( QueuePathsTest.class );
-
-
- @Test
- // TODO - why does this test case not have assertions to test results?
- // tests should not be written like this: what's the point? If it's
- // code coverage this is still bad.
- public void testPaths() throws Exception {
- logger.info( normalizeQueuePath( "a/b/c" ) );
- logger.info( normalizeQueuePath( "a/b/c/" ) );
- logger.info( normalizeQueuePath( "/a/b/c" ) );
- logger.info( normalizeQueuePath( "/////a/b/c" ) );
- logger.info( normalizeQueuePath( "/" ) );
-
- logger.info( mapToFormattedJsonString( getQueueParentPaths( "/a/b/c" ) ) );
- logger.info( mapToFormattedJsonString( getQueueParentPaths( "/" ) ) );
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java
new file mode 100644
index 0000000..c12dc41
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+
+public class LegacyQueue {
+ private final String url;
+
+ public LegacyQueue(String url) {
+ this.url = url;
+ }
+
+ public String getUrl(){
+ return url;
+ }
+
+ public boolean isEmpty(){
+ return url == null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
new file mode 100644
index 0000000..907eec2
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -0,0 +1,106 @@
+package org.apache.usergrid.persistence.queue;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+@FigSingleton
+public interface LegacyQueueFig extends GuicyFig {
+
+ /**
+ * Any region value string must exactly match the region names specified on this page:
+ *
+ * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html*
+ */
+
+
+ /**
+ * Primary region to use for Amazon queues.
+ */
+ @Key( "usergrid.queue.region" )
+ @Default("us-east-1")
+ String getPrimaryRegion();
+
+ /**
+ * Flag to determine if Usergrid should use a multi-region Amazon queue
+ * implementation.
+ */
+ @Key( "usergrid.queue.multiregion" )
+ @Default("false")
+ boolean isMultiRegion();
+
+ /**
+ * Comma-separated list of one or more Amazon regions to use if multiregion
+ * is set to true.
+ */
+ @Key( "usergrid.queue.regionList" )
+ @Default("us-east-1")
+ String getRegionList();
+
+
+ /**
+ * Set the amount of time (in minutes) to retain messages in a queue.
+ * 1209600 = 14 days (maximum retention period)
+ */
+ @Key( "usergrid.queue.retention" )
+ @Default("1209600")
+ String getRetentionPeriod();
+
+ /**
+ * Set the amount of time (in minutes) to retain messages in a dead letter queue.
+ * 1209600 = 14 days (maximum retention period)
+ */
+ @Key( "usergrid.queue.deadletter.retention" )
+ @Default("1209600")
+ String getDeadletterRetentionPeriod();
+
+ /**
+ * The maximum number of attempts to attempt to deliver before failing into the DLQ
+ */
+ @Key( "usergrid.queue.deliveryLimit" )
+ @Default("10")
+ String getQueueDeliveryLimit();
+
+ @Key("usergrid.use.default.queue")
+ @Default("false")
+ boolean overrideQueueForDefault();
+
+ @Key("usergrid.queue.publish.threads")
+ @Default("100")
+ int getAsyncMaxThreads();
+
+ // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
+ @Key("usergrid.queue.publish.queuesize")
+ @Default("250000")
+ int getAsyncQueueSize();
+
+ /**
+ * Set the visibility timeout (in milliseconds) for faster retries
+ * @return
+ */
+ @Key( "usergrid.queue.visibilityTimeout" )
+ @Default("5000") // 5 seconds
+ int getVisibilityTimeout();
+
+ @Key( "usergrid.queue.localquorum.timeout")
+ @Default("30000") // 30 seconds
+ int getLocalQuorumTimeout();
+
+ @Key( "usergrid.queue.client.connection.timeout")
+ @Default( "5000" ) // 5 seconds
+ int getQueueClientConnectionTimeout();
+
+ @Key( "usergrid.queue.client.socket.timeout")
+ @Default( "50000" ) // 50 seconds
+ int getQueueClientSocketTimeout();
+
+ @Key( "usergrid.queue.poll.timeout")
+ @Default( "10000" ) // 10 seconds
+ int getQueuePollTimeout();
+
+ @Key( "usergrid.queue.quorum.fallback")
+ @Default("false") // 30 seconds
+ boolean getQuorumFallback();
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
new file mode 100644
index 0000000..053dd36
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**ctor
+ * Manages queues for usergrid. Current implementation is sqs based.
+ */
+public interface LegacyQueueManager {
+
+ /**
+ * Read messages from queue
+ * @param limit
+ * @param klass class to cast the return from
+ * @return List of Queue Messages
+ */
+ List<LegacyQueueMessage> getMessages(int limit, Class klass);
+
+ /**
+ * get the queue depth
+ * @return
+ */
+ long getQueueDepth();
+
+ /**
+ * Commit the transaction
+ * @param queueMessage
+ */
+ void commitMessage( LegacyQueueMessage queueMessage);
+
+ /**
+ * commit multiple messages
+ * @param queueMessages
+ */
+ void commitMessages( List<LegacyQueueMessage> queueMessages);
+
+ /**
+ * send messages to queue
+ * @param bodies body objects must be serializable
+ * @throws IOException
+ */
+ void sendMessages(List bodies) throws IOException;
+
+ /**
+ * send a message to queue
+ * @param body
+ * @throws IOException
+ */
+ <T extends Serializable> void sendMessage(T body)throws IOException;
+
+ /**
+ * Send a messae to the topic to be sent to other queues
+ * @param body
+ */
+ <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
+
+ /**
+ * purge messages
+ */
+ void deleteQueue();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java
new file mode 100644
index 0000000..53afc22
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public interface LegacyQueueManagerFactory {
+ public LegacyQueueManager getQueueManager(final LegacyQueueScope scope );
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java
new file mode 100644
index 0000000..986f211
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.queue;
+
+/**
+ * QueueManagerInternal guice factory
+ */
+public interface LegacyQueueManagerInternalFactory {
+ LegacyQueueManager getQueueManager(final LegacyQueueScope scope );
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java
new file mode 100644
index 0000000..939443d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public class LegacyQueueMessage {
+ private final Object body;
+ private final String messageId;
+ private final String handle;
+ private final String type;
+ private String stringBody;
+ private int receiveCount;
+
+
+ public LegacyQueueMessage(String messageId, String handle, Object body, String type) {
+ this.body = body;
+ this.messageId = messageId;
+ this.handle = handle;
+ this.type = type;
+ this.stringBody = "";
+ this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue
+ }
+
+ public String getHandle() {
+ return handle;
+ }
+
+ public Object getBody(){
+ return body;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+
+ public String getType() {
+ return type;
+ }
+
+ public void setStringBody(String stringBody) {
+ this.stringBody = stringBody;
+ }
+
+ public String getStringBody() {
+ return stringBody;
+ }
+
+ public void setReceiveCount(int receiveCount){
+ this.receiveCount = receiveCount;
+ }
+
+ public int getReceiveCount(){
+ return receiveCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
new file mode 100644
index 0000000..1f932b2
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+public interface LegacyQueueScope {
+
+ /**
+ * LOCAL will create a SNS topic with a queue subscription in a single AWS region.
+ * ALL will create SNS topics and queue subscriptions in ALL AWS regions.
+ */
+ enum RegionImplementation {
+ LOCAL,
+ ALL
+ }
+
+ /**
+ * Get the name of the the map
+ * @return
+ */
+ public String getName();
+
+ /**
+ * Get the Usergrid region enum
+ */
+ public RegionImplementation getRegionImplementation();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 1f4261a..630c2e7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -34,17 +34,17 @@ import java.util.concurrent.TimeUnit;
/**
* Default queue manager implementation, uses in memory linked queue
*/
-public class LocalQueueManager implements QueueManager {
+public class LocalQueueManager implements LegacyQueueManager {
private static final Logger logger = LoggerFactory.getLogger(LocalQueueManager.class);
- public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
+ public ArrayBlockingQueue<LegacyQueueMessage> queue = new ArrayBlockingQueue<>(10000);
@Override
- public List<QueueMessage> getMessages(int limit, Class klass) {
- List<QueueMessage> returnQueue = new ArrayList<>();
+ public List<LegacyQueueMessage> getMessages(int limit, Class klass) {
+ List<LegacyQueueMessage> returnQueue = new ArrayList<>();
try {
- QueueMessage message=null;
+ LegacyQueueMessage message=null;
int count = 5;
do {
message = queue.poll(100, TimeUnit.MILLISECONDS);
@@ -64,11 +64,11 @@ public class LocalQueueManager implements QueueManager {
}
@Override
- public void commitMessage(QueueMessage queueMessage) {
+ public void commitMessage(LegacyQueueMessage queueMessage) {
}
@Override
- public void commitMessages(List<QueueMessage> queueMessages) {
+ public void commitMessages(List<LegacyQueueMessage> queueMessages) {
}
@Override
@@ -76,7 +76,7 @@ public class LocalQueueManager implements QueueManager {
for(Object body : bodies){
String uuid = UUID.randomUUID().toString();
try {
- queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+ queue.put(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"));
}catch (InterruptedException ie){
throw new RuntimeException(ie);
}
@@ -88,7 +88,7 @@ public class LocalQueueManager implements QueueManager {
public <T extends Serializable> void sendMessage( final T body ) throws IOException {
String uuid = UUID.randomUUID().toString();
try {
- queue.offer(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS);
+ queue.offer(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS);
}catch (InterruptedException ie){
throw new RuntimeException(ie);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
deleted file mode 100644
index 24070d0..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-
-public class Queue {
- private final String url;
-
- public Queue(String url) {
- this.url = url;
- }
-
- public String getUrl(){
- return url;
- }
-
- public boolean isEmpty(){
- return url == null;
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
deleted file mode 100644
index 74912ae..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package org.apache.usergrid.persistence.queue;
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-@FigSingleton
-public interface QueueFig extends GuicyFig {
-
- /**
- * Any region value string must exactly match the region names specified on this page:
- *
- * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html*
- */
-
-
- /**
- * Primary region to use for Amazon queues.
- */
- @Key( "usergrid.queue.region" )
- @Default("us-east-1")
- String getPrimaryRegion();
-
- /**
- * Flag to determine if Usergrid should use a multi-region Amazon queue
- * implementation.
- */
- @Key( "usergrid.queue.multiregion" )
- @Default("false")
- boolean isMultiRegion();
-
- /**
- * Comma-separated list of one or more Amazon regions to use if multiregion
- * is set to true.
- */
- @Key( "usergrid.queue.regionList" )
- @Default("us-east-1")
- String getRegionList();
-
-
- /**
- * Set the amount of time (in minutes) to retain messages in a queue.
- * 1209600 = 14 days (maximum retention period)
- */
- @Key( "usergrid.queue.retention" )
- @Default("1209600")
- String getRetentionPeriod();
-
- /**
- * Set the amount of time (in minutes) to retain messages in a dead letter queue.
- * 1209600 = 14 days (maximum retention period)
- */
- @Key( "usergrid.queue.deadletter.retention" )
- @Default("1209600")
- String getDeadletterRetentionPeriod();
-
- /**
- * The maximum number of attempts to attempt to deliver before failing into the DLQ
- */
- @Key( "usergrid.queue.deliveryLimit" )
- @Default("10")
- String getQueueDeliveryLimit();
-
- @Key("usergrid.use.default.queue")
- @Default("false")
- boolean overrideQueueForDefault();
-
- @Key("usergrid.queue.publish.threads")
- @Default("100")
- int getAsyncMaxThreads();
-
- // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
- @Key("usergrid.queue.publish.queuesize")
- @Default("250000")
- int getAsyncQueueSize();
-
- /**
- * Set the visibility timeout (in milliseconds) for faster retries
- * @return
- */
- @Key( "usergrid.queue.visibilityTimeout" )
- @Default("5000") // 5 seconds
- int getVisibilityTimeout();
-
- @Key( "usergrid.queue.localquorum.timeout")
- @Default("30000") // 30 seconds
- int getLocalQuorumTimeout();
-
- @Key( "usergrid.queue.client.connection.timeout")
- @Default( "5000" ) // 5 seconds
- int getQueueClientConnectionTimeout();
-
- @Key( "usergrid.queue.client.socket.timeout")
- @Default( "50000" ) // 50 seconds
- int getQueueClientSocketTimeout();
-
- @Key( "usergrid.queue.poll.timeout")
- @Default( "10000" ) // 10 seconds
- int getQueuePollTimeout();
-
- @Key( "usergrid.queue.quorum.fallback")
- @Default("false") // 30 seconds
- boolean getQuorumFallback();
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
deleted file mode 100644
index d2e29cb..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-/**ctor
- * Manages queues for usergrid. Current implementation is sqs based.
- */
-public interface QueueManager {
-
- /**
- * Read messages from queue
- * @param limit
- * @param klass class to cast the return from
- * @return List of Queue Messages
- */
- List<QueueMessage> getMessages(int limit, Class klass);
-
- /**
- * get the queue depth
- * @return
- */
- long getQueueDepth();
-
- /**
- * Commit the transaction
- * @param queueMessage
- */
- void commitMessage( QueueMessage queueMessage);
-
- /**
- * commit multiple messages
- * @param queueMessages
- */
- void commitMessages( List<QueueMessage> queueMessages);
-
- /**
- * send messages to queue
- * @param bodies body objects must be serializable
- * @throws IOException
- */
- void sendMessages(List bodies) throws IOException;
-
- /**
- * send a message to queue
- * @param body
- * @throws IOException
- */
- <T extends Serializable> void sendMessage(T body)throws IOException;
-
- /**
- * Send a messae to the topic to be sent to other queues
- * @param body
- */
- <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
-
- /**
- * purge messages
- */
- void deleteQueue();
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
deleted file mode 100644
index 4cdb5e2..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-public interface QueueManagerFactory {
- public QueueManager getQueueManager( final QueueScope scope );
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java
deleted file mode 100644
index 119c064..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.queue;
-
-/**
- * QueueManagerInternal guice factory
- */
-public interface QueueManagerInternalFactory {
- QueueManager getQueueManager( final QueueScope scope );
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
deleted file mode 100644
index f8ce6ef..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-public class QueueMessage {
- private final Object body;
- private final String messageId;
- private final String handle;
- private final String type;
- private String stringBody;
- private int receiveCount;
-
-
- public QueueMessage(String messageId, String handle, Object body,String type) {
- this.body = body;
- this.messageId = messageId;
- this.handle = handle;
- this.type = type;
- this.stringBody = "";
- this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue
- }
-
- public String getHandle() {
- return handle;
- }
-
- public Object getBody(){
- return body;
- }
-
- public String getMessageId() {
- return messageId;
- }
-
-
- public String getType() {
- return type;
- }
-
- public void setStringBody(String stringBody) {
- this.stringBody = stringBody;
- }
-
- public String getStringBody() {
- return stringBody;
- }
-
- public void setReceiveCount(int receiveCount){
- this.receiveCount = receiveCount;
- }
-
- public int getReceiveCount(){
- return receiveCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
deleted file mode 100644
index f58584f..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.queue;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-
-public interface QueueScope {
-
- /**
- * LOCAL will create a SNS topic with a queue subscription in a single AWS region.
- * ALL will create SNS topics and queue subscriptions in ALL AWS regions.
- */
- enum RegionImplementation {
- LOCAL,
- ALL
- }
-
- /**
- * Get the name of the the map
- * @return
- */
- public String getName();
-
- /**
- * Get the Usergrid region enum
- */
- public RegionImplementation getRegionImplementation();
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index caf61bf..6d62da0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -18,14 +18,14 @@
package org.apache.usergrid.persistence.queue.guice;
-import org.apache.usergrid.persistence.queue.QueueManagerInternalFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory;
import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
import org.safehaus.guicyfig.GuicyFigModule;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -42,11 +42,11 @@ public class QueueModule extends AbstractModule {
@Override
protected void configure() {
- install(new GuicyFigModule(QueueFig.class));
+ install(new GuicyFigModule(LegacyQueueFig.class));
- bind(QueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
- install(new FactoryModuleBuilder().implement(QueueManager.class, SNSQueueManagerImpl.class)
- .build(QueueManagerInternalFactory.class));
+ bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
+ install(new FactoryModuleBuilder().implement(LegacyQueueManager.class, SNSQueueManagerImpl.class)
+ .build(LegacyQueueManagerInternalFactory.class));
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
new file mode 100644
index 0000000..51d6c03
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+
+public class LegacyQueueScopeImpl implements LegacyQueueScope {
+
+ private final String name;
+ private final RegionImplementation regionImpl;
+
+ public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl) {
+ this.name = name;
+ this.regionImpl = regionImpl;
+ }
+
+
+
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public RegionImplementation getRegionImplementation() {return regionImpl;}
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( !( o instanceof LegacyQueueScopeImpl) ) {
+ return false;
+ }
+
+ final LegacyQueueScopeImpl queueScope = (LegacyQueueScopeImpl) o;
+
+ if ( !name.equals( queueScope.name ) ) {
+ return false;
+ }
+
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index 93b2fb2..2d51903 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -33,26 +33,26 @@ import java.util.concurrent.ExecutionException;
* manages whether we take in an external in memory override for queues.
*/
@Singleton
-public class QueueManagerFactoryImpl implements QueueManagerFactory {
+public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory {
private static final Logger logger = LoggerFactory.getLogger( QueueManagerFactoryImpl.class );
- private final QueueFig queueFig;
- private final QueueManagerInternalFactory queuemanagerInternalFactory;
- private final Map<String,QueueManager> defaultManager;
- private final LoadingCache<QueueScope, QueueManager> queueManager =
+ private final LegacyQueueFig queueFig;
+ private final LegacyQueueManagerInternalFactory queuemanagerInternalFactory;
+ private final Map<String,LegacyQueueManager> defaultManager;
+ private final LoadingCache<LegacyQueueScope, LegacyQueueManager> queueManager =
CacheBuilder
.newBuilder()
.initialCapacity(5)
.maximumSize(100)
- .build(new CacheLoader<QueueScope, QueueManager>() {
+ .build(new CacheLoader<LegacyQueueScope, LegacyQueueManager>() {
@Override
- public QueueManager load( QueueScope scope ) throws Exception {
+ public LegacyQueueManager load(LegacyQueueScope scope ) throws Exception {
if ( queueFig.overrideQueueForDefault() ){
- QueueManager manager = defaultManager.get( scope.getName() );
+ LegacyQueueManager manager = defaultManager.get( scope.getName() );
if ( manager == null ) {
manager = new LocalQueueManager();
defaultManager.put( scope.getName(), manager );
@@ -67,14 +67,14 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory {
});
@Inject
- public QueueManagerFactoryImpl(final QueueFig queueFig, final QueueManagerInternalFactory queuemanagerInternalFactory){
+ public QueueManagerFactoryImpl(final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory){
this.queueFig = queueFig;
this.queuemanagerInternalFactory = queuemanagerInternalFactory;
this.defaultManager = new HashMap<>(10);
}
@Override
- public QueueManager getQueueManager(QueueScope scope) {
+ public LegacyQueueManager getQueueManager(LegacyQueueScope scope) {
try {
return queueManager.get(scope);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
deleted file mode 100644
index fa50b49..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue.impl;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.queue.QueueScope;
-
-public class QueueScopeImpl implements QueueScope {
-
- private final String name;
- private final RegionImplementation regionImpl;
-
- public QueueScopeImpl( final String name, final RegionImplementation regionImpl) {
- this.name = name;
- this.regionImpl = regionImpl;
- }
-
-
-
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public RegionImplementation getRegionImplementation() {return regionImpl;}
-
- @Override
- public boolean equals( final Object o ) {
- if ( this == o ) {
- return true;
- }
- if ( !( o instanceof QueueScopeImpl ) ) {
- return false;
- }
-
- final QueueScopeImpl queueScope = ( QueueScopeImpl ) o;
-
- if ( !name.equals( queueScope.name ) ) {
- return false;
- }
-
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
- return name.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index ae11517..6d4e0c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -35,11 +35,11 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.LegacyQueue;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
import com.amazonaws.AmazonServiceException;
@@ -80,12 +80,12 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-public class SNSQueueManagerImpl implements QueueManager {
+public class SNSQueueManagerImpl implements LegacyQueueManager {
private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class );
- private final QueueScope scope;
- private final QueueFig fig;
+ private final LegacyQueueScope scope;
+ private final LegacyQueueFig fig;
private final ClusterFig clusterFig;
private final CassandraFig cassandraFig;
private final ClientConfiguration clientConfiguration;
@@ -121,16 +121,16 @@ public class SNSQueueManagerImpl implements QueueManager {
}
} );
- private final LoadingCache<String, Queue> readQueueUrlMap =
- CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() {
+ private final LoadingCache<String, LegacyQueue> readQueueUrlMap =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, LegacyQueue>() {
@Override
- public Queue load( String queueName ) throws Exception {
+ public LegacyQueue load(String queueName ) throws Exception {
- Queue queue = null;
+ LegacyQueue queue = null;
try {
GetQueueUrlResult result = sqs.getQueueUrl( queueName );
- queue = new Queue( result.getQueueUrl() );
+ queue = new LegacyQueue( result.getQueueUrl() );
}
catch ( QueueDoesNotExistException queueDoesNotExistException ) {
logger.error( "Queue {} does not exist, will create", queueName );
@@ -142,7 +142,7 @@ public class SNSQueueManagerImpl implements QueueManager {
if ( queue == null ) {
String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
- queue = new Queue( url );
+ queue = new LegacyQueue( url );
}
setupTopics( queueName );
@@ -153,8 +153,8 @@ public class SNSQueueManagerImpl implements QueueManager {
@Inject
- public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
- CassandraFig cassandraFig, QueueFig queueFig ) {
+ public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig,
+ CassandraFig cassandraFig, LegacyQueueFig queueFig ) {
this.scope = scope;
this.fig = fig;
this.clusterFig = clusterFig;
@@ -232,7 +232,7 @@ public class SNSQueueManagerImpl implements QueueManager {
"Unable to subscribe PRIMARY queue=[{}] to topic=[{}]", queueUrl, primaryTopicArn, e );
}
- if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) {
+ if ( fig.isMultiRegion() && scope.getRegionImplementation() == LegacyQueueScope.RegionImplementation.ALL ) {
String multiRegion = fig.getRegionList();
@@ -391,7 +391,7 @@ public class SNSQueueManagerImpl implements QueueManager {
}
- public Queue getReadQueue() {
+ public LegacyQueue getReadQueue() {
String queueName = getName();
try {
@@ -414,7 +414,7 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
- public List<QueueMessage> getMessages(final int limit, final Class klass) {
+ public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) {
if ( sqs == null ) {
logger.error( "SQS is null - was not initialized properly" );
@@ -457,7 +457,7 @@ public class SNSQueueManagerImpl implements QueueManager {
logger.trace( "Received {} messages from {}", messages.size(), url );
}
- List<QueueMessage> queueMessages = new ArrayList<>( messages.size() );
+ List<LegacyQueueMessage> queueMessages = new ArrayList<>( messages.size() );
for ( Message message : messages ) {
@@ -487,7 +487,7 @@ public class SNSQueueManagerImpl implements QueueManager {
throw new RuntimeException( e );
}
- QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
+ LegacyQueueMessage queueMessage = new LegacyQueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
message.getAttributes().get( "type" ) );
queueMessage.setStringBody( originalBody );
int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount"));
@@ -634,7 +634,7 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
- public void commitMessage( final QueueMessage queueMessage ) {
+ public void commitMessage( final LegacyQueueMessage queueMessage ) {
String url = getReadQueue().getUrl();
if ( logger.isTraceEnabled() ) {
logger.trace( "Commit message {} to queue {}", queueMessage.getMessageId(), url );
@@ -646,7 +646,7 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
- public void commitMessages( final List<QueueMessage> queueMessages ) {
+ public void commitMessages( final List<LegacyQueueMessage> queueMessages ) {
String url = getReadQueue().getUrl();
if ( logger.isTraceEnabled() ) {
@@ -655,7 +655,7 @@ public class SNSQueueManagerImpl implements QueueManager {
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
- for ( QueueMessage message : queueMessages ) {
+ for ( LegacyQueueMessage message : queueMessages ) {
entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index b6b1aaa..56bef91 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -9,7 +9,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
import com.amazonaws.auth.policy.Condition;
import com.amazonaws.auth.policy.Policy;
@@ -40,7 +40,7 @@ public class AmazonNotificationUtils {
private static final Logger logger = LoggerFactory.getLogger( AmazonNotificationUtils.class );
- public static String createQueue( final AmazonSQSClient sqs, final String queueName, final QueueFig fig )
+ public static String createQueue( final AmazonSQSClient sqs, final String queueName, final LegacyQueueFig fig )
throws Exception {
final String deadletterQueueName = String.format( "%s_dead", queueName );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
new file mode 100644
index 0000000..69655e5
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.queue;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+
+import com.google.inject.Inject;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+
+@RunWith( ITRunner.class )
+@UseModules( { TestQueueModule.class } )
+public class LegacyQueueManagerTest {
+
+ @Inject
+ protected LegacyQueueFig queueFig;
+ @Inject
+ protected LegacyQueueManagerFactory qmf;
+
+ /**
+ * Mark tests as ignored if no AWS creds are present
+ */
+ @Rule
+ public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule();
+
+
+ protected LegacyQueueScope scope;
+ private LegacyQueueManager qm;
+
+ public static long queueSeed = System.currentTimeMillis();
+
+
+ @Before
+ public void mockApp() {
+
+ this.scope = new LegacyQueueScopeImpl( "testQueue"+queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL);
+ qm = qmf.getQueueManager(scope);
+ }
+
+ @org.junit.After
+ public void cleanup(){
+ qm.deleteQueue();
+ }
+
+
+ @Test
+ public void send() throws Exception{
+ String value = "bodytest";
+ qm.sendMessage(value);
+ List<LegacyQueueMessage> messageList = qm.getMessages(1, String.class);
+ assertTrue(messageList.size() >= 1);
+ for(LegacyQueueMessage message : messageList){
+ assertTrue(message.getBody().equals(value));
+ qm.commitMessage(message);
+ }
+
+ messageList = qm.getMessages(1, String.class);
+ assertTrue(messageList.size() <= 0);
+
+ }
+
+ @Test
+ public void sendMore() throws Exception{
+ HashMap<String,String> values = new HashMap<>();
+ values.put("test","Test");
+
+ List<Map<String,String>> bodies = new ArrayList<>();
+ bodies.add(values);
+ qm.sendMessages(bodies);
+ List<LegacyQueueMessage> messageList = qm.getMessages(1, values.getClass());
+ assertTrue(messageList.size() >= 1);
+ for(LegacyQueueMessage message : messageList){
+ assertTrue(message.getBody().equals(values));
+ }
+ qm.commitMessages(messageList);
+
+ messageList = qm.getMessages(1, values.getClass());
+ assertTrue(messageList.size() <= 0);
+
+ }
+
+ @Test
+ public void queueSize() throws Exception{
+ HashMap<String,String> values = new HashMap<>();
+ values.put("test", "Test");
+
+ List<Map<String,String>> bodies = new ArrayList<>();
+ bodies.add(values);
+ long initialDepth = qm.getQueueDepth();
+ qm.sendMessages(bodies);
+ long depth = 0;
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth>0){
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assertTrue(depth>0);
+
+ List<LegacyQueueMessage> messageList = qm.getMessages(10, values.getClass());
+ assertTrue(messageList.size() <= 500);
+ for(LegacyQueueMessage message : messageList){
+ assertTrue(message.getBody().equals(values));
+ }
+ if(messageList.size()>0) {
+ qm.commitMessages(messageList);
+ }
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth==initialDepth){
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assertEquals(initialDepth, depth);
+ }
+
+
+
+}