You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/03 21:10:25 UTC
[3/4] git commit: added fig
added fig
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/92cac3d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/92cac3d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/92cac3d2
Branch: refs/heads/sqs_queues
Commit: 92cac3d262a8ebe1d25c21f132c50e670c20027c
Parents: 95d3a8c
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 12:26:57 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 3 12:26:57 2014 -0600
----------------------------------------------------------------------
stack/corepersistence/queue/pom.xml | 22 +-----
.../usergrid/persistence/queue/Queue.java | 31 ++++++++
.../usergrid/persistence/queue/QueueFig.java | 16 ++++
.../persistence/queue/QueueManager.java | 24 ++++++
.../persistence/queue/QueueMessage.java | 43 +++++++++++
.../persistence/queue/guice/QueueModule.java | 11 ++-
.../queue/impl/QueueManagerImpl.java | 79 +++++++++++++++++++-
.../persistence/queue/QueueManagerTest.java | 13 ++--
8 files changed, 203 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 09d66c6..6795248 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -82,28 +82,10 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
- <version>1.0.002</version>
+ <version>1.8.11</version>
</dependency>
</dependencies>
- <build>
- <plugins>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <!-- We want to exclude any chop tests or stress tests. They kill the embedded cassandra and
- aren't intended to be part of the build process-->
- <excludes>
- <exclude>**/*ChopTest.java</exclude>
- <exclude>**/*LoadTest.java</exclude>
- <exclude>**/*StressTest.java</exclude>
- </excludes>
- </configuration>
- </plugin>
-
- </plugins>
- </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
new file mode 100644
index 0000000..2cc49aa
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+
+public class Queue {
+ private final String url;
+
+ public Queue(String url) {
+ this.url = url;
+ }
+
+ public String getUrl(){
+ return url;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
new file mode 100644
index 0000000..479fb97
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.queue;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+@FigSingleton
+public interface QueueFig extends GuicyFig {
+
+ @Key( "queue.region" )
+ @Default("US_EAST_1")
+ public String getRegion();
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 9d8a6aa..e29310b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -1,7 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
package org.apache.usergrid.persistence.queue;
+import java.util.List;
+
/**
* Created by ApigeeCorporation on 10/3/14.
*/
public interface QueueManager {
+ Queue createQueue( );
+ Queue getQueue();
+ List<QueueMessage> getMessages(int limit,int timeout);
+ void commitMessage( QueueMessage queueMessage);
+ void commitMessages( List<QueueMessage> queueMessages);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
new file mode 100644
index 0000000..b07d220
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+
+public class QueueMessage {
+ private final String body;
+ private final String messageId;
+ private final String handle;
+
+ public QueueMessage(String messageId, String handle, String body) {
+ this.body = body;
+ this.messageId = messageId;
+ this.handle = handle;
+ }
+
+ public String getHandle() {
+ return handle;
+ }
+
+ public String getBody(){
+ return body;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index 2168db9..a74754f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -17,16 +17,15 @@
*/
package org.apache.usergrid.persistence.queue.guice;
-
-import org.apache.usergrid.persistence.core.migration.Migration;
-
import com.google.inject.AbstractModule;
-import com.google.inject.Key;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.multibindings.Multibinder;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.migration.MigrationManagerFig;
+import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.impl.QueueManagerImpl;
+import org.safehaus.guicyfig.GuicyFigModule;
/**
@@ -40,11 +39,11 @@ public class QueueModule extends AbstractModule {
@Override
protected void configure() {
+ install( new GuicyFigModule( QueueFig.class) );
// create a guice factory for getting our collection manager
install( new FactoryModuleBuilder().implement( QueueManager.class, QueueManagerImpl.class )
.build( QueueManagerFactory.class ) );
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
index acf1d80..6547f9f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
@@ -1,9 +1,84 @@
package org.apache.usergrid.persistence.queue.impl;
-import org.apache.usergrid.persistence.queue.QueueManager;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.*;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.queue.*;
+
+import java.util.ArrayList;
+import java.util.List;
public class QueueManagerImpl implements QueueManager {
- public QueueManagerImpl(){
+ private final AmazonSQSClient sqs;
+ private final QueueScope scope;
+ private final QueueFig fig;
+ private Queue queue;
+
+ @Inject
+ public QueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
+ this.fig = fig;
+ this.scope = scope;
+ EnvironmentVariableCredentialsProvider credsProvider = new EnvironmentVariableCredentialsProvider();
+ this.sqs = new AmazonSQSClient(credsProvider.getCredentials());
+ Regions regions = Regions.fromName(fig.getRegion());
+ Region region = Region.getRegion(regions);
+ sqs.setRegion(region);
+ }
+
+ public Queue createQueue(){
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest()
+ .withQueueName(getName());
+ CreateQueueResult result = sqs.createQueue(createQueueRequest);
+ return new Queue(result.getQueueUrl());
+ }
+
+ private String getName() {
+ return scope.getApplication().getUuid().toString()+ scope.getName();
+ }
+
+ public Queue getQueue(){
+ if(queue == null) {
+ for (String queueUrl : sqs.listQueues().getQueueUrls()) {
+ boolean found = queueUrl.contains(getName());
+ if (found) {
+ queue = new Queue(queueUrl);
+ break;
+ }
+ }
+ }
+ return queue;
+ }
+
+ public List<QueueMessage> getMessages( int limit,int timeout){
+ System.out.println("Receiving messages from MyQueue.\n");
+ ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(getQueue().getUrl());
+ receiveMessageRequest.setMaxNumberOfMessages(limit);
+ receiveMessageRequest.setVisibilityTimeout(timeout);
+ List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
+ List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+ for (Message message : messages) {
+ QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),message.getBody());
+ queueMessages.add(queueMessage);
+ }
+ return queueMessages;
+ }
+
+ public void commitMessage( QueueMessage queueMessage){
+ sqs.deleteMessage(new DeleteMessageRequest()
+ .withQueueUrl(getQueue().getUrl())
+ .withReceiptHandle(queueMessage.getHandle()));
+ }
+ public void commitMessages( List<QueueMessage> queueMessages){
+ List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
+ for(QueueMessage message : queueMessages){
+ entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
+ }
+ DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(getQueue().getUrl(),entries);
+ DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/92cac3d2/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index e34d877..8892e3c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -48,12 +48,7 @@ public class QueueManagerTest {
@Inject
- @Rule
- public MigrationManagerRule migrationManagerRule;
-
-
- @Inject
- protected QueueManagerFactory mmf;
+ protected QueueManagerFactory qmf;
protected QueueScope scope;
@@ -65,8 +60,10 @@ public class QueueManagerTest {
@Test
- public void writeReadString() {
-
+ public void createQueue() {
+ QueueManager qm = qmf.getQueueManager(scope);
+ qm.createQueue();
+ Queue queue = qm.getQueue();
}
}