You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/03 21:56:42 UTC
git commit: moving dependencies
Repository: incubator-usergrid
Updated Branches:
refs/heads/sqs_queues 749bbedf9 -> bfcc33e13
moving dependencies
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bfcc33e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bfcc33e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bfcc33e1
Branch: refs/heads/sqs_queues
Commit: bfcc33e13e625e729aaf7eed35275bf7a676cfcb
Parents: 749bbed
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 13:56:25 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 3 13:56:25 2014 -0600
----------------------------------------------------------------------
.../util/UsergridAwsCredentialsProvider.java | 62 -------
.../persistence/queue/guice/QueueModule.java | 6 +-
.../queue/impl/QueueManagerImpl.java | 127 --------------
.../queue/impl/SQSQueueManagerImpl.java | 164 +++++++++++++++++++
4 files changed, 166 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bfcc33e1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java
deleted file mode 100644
index e8608c7..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.core.util;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.SDKGlobalConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import org.apache.commons.lang.StringUtils;
-
-
-public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
-
- private AWSCredentials creds;
-
- public UsergridAwsCredentialsProvider(){
- init();
- }
-
- private void init() {
- creds = new AWSCredentials() {
- @Override
- public String getAWSAccessKeyId() {
- return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR));
- }
-
- @Override
- public String getAWSSecretKey() {
- return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR));
- }
- };
- if(StringUtils.isEmpty(creds.getAWSAccessKeyId()) || StringUtils.isEmpty(creds.getAWSSecretKey()) ){
- throw new AmazonClientException("could not retrieve credentials from system properties");
- }
- }
-
- @Override
- public AWSCredentials getCredentials() {
- return creds;
- }
-
-
- @Override
- public void refresh() {
- init();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bfcc33e1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index a74754f..e8fc7c8 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -19,12 +19,10 @@ package org.apache.usergrid.persistence.queue.guice;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.migration.MigrationManagerFig;
import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.impl.QueueManagerImpl;
+import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -41,7 +39,7 @@ public class QueueModule extends AbstractModule {
install( new GuicyFigModule( QueueFig.class) );
// create a guice factory for getting our collection manager
- install( new FactoryModuleBuilder().implement( QueueManager.class, QueueManagerImpl.class )
+ install( new FactoryModuleBuilder().implement( QueueManager.class, SQSQueueManagerImpl.class )
.build( QueueManagerFactory.class ) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bfcc33e1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
deleted file mode 100644
index 26d127f..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue.impl;
-
-import com.amazonaws.SDKGlobalConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.core.util.UsergridAwsCredentialsProvider;
-import org.apache.usergrid.persistence.queue.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class QueueManagerImpl implements QueueManager {
- private final AmazonSQSClient sqs;
- private final QueueScope scope;
- private final QueueFig fig;
- private Queue queue;
-
- @Inject
- public QueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
- this.fig = fig;
- this.scope = scope;
- UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
- Regions regions = Regions.fromName(fig.getRegion());
- Region region = Region.getRegion(regions);
- sqs.setRegion(region);
- }
-
-
- public Queue createQueue(){
- CreateQueueRequest createQueueRequest = new CreateQueueRequest()
- .withQueueName(getName());
- CreateQueueResult result = sqs.createQueue(createQueueRequest);
- return new Queue(result.getQueueUrl());
- }
-
- private String getName() {
- return scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
- }
-
- public Queue getQueue(){
- if(queue == null) {
- for (String queueUrl : sqs.listQueues().getQueueUrls()) {
- boolean found = queueUrl.contains(getName());
- if (found) {
- queue = new Queue(queueUrl);
- break;
- }
- }
- }
- if(queue == null) {
- queue = createQueue();
- }
- return queue;
- }
-
- public void sendMessage(String body){
- SendMessageRequest request = new SendMessageRequest(getQueue().getUrl(),body);
- sqs.sendMessage(request);
- }
-
- public void sendMessages(List<String> bodies){
- SendMessageBatchRequest request = new SendMessageBatchRequest(getQueue().getUrl());
- List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
- for(String body : bodies){
- SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
- entry.setMessageBody(body);
- entries.add(entry);
- }
- request.setEntries(entries);
- sqs.sendMessageBatch(request);
- }
-
- public List<QueueMessage> getMessages( int limit,int timeout){
- System.out.println("Receiving messages from MyQueue.\n");
- ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(getQueue().getUrl());
- receiveMessageRequest.setMaxNumberOfMessages(limit);
- receiveMessageRequest.setVisibilityTimeout(timeout);
- List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
- List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
- for (Message message : messages) {
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),message.getBody());
- queueMessages.add(queueMessage);
- }
- return queueMessages;
- }
-
- public void commitMessage( QueueMessage queueMessage){
- sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(getQueue().getUrl())
- .withReceiptHandle(queueMessage.getHandle()));
- }
-
- public void commitMessages( List<QueueMessage> queueMessages){
- List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
- for(QueueMessage message : queueMessages){
- entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
- }
- DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(getQueue().getUrl(),entries);
- DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bfcc33e1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
new file mode 100644
index 0000000..d1f526b
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.SDKGlobalConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.*;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.queue.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SQSQueueManagerImpl implements QueueManager {
+ private final AmazonSQSClient sqs;
+ private final QueueScope scope;
+ private final QueueFig fig;
+ private Queue queue;
+
+ @Inject
+ public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
+ this.fig = fig;
+ this.scope = scope;
+ UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+ this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
+ Regions regions = Regions.fromName(fig.getRegion());
+ Region region = Region.getRegion(regions);
+ sqs.setRegion(region);
+ }
+
+
+ public Queue createQueue(){
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest()
+ .withQueueName(getName());
+ CreateQueueResult result = sqs.createQueue(createQueueRequest);
+ return new Queue(result.getQueueUrl());
+ }
+
+ private String getName() {
+ return scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
+ }
+
+ public Queue getQueue(){
+ if(queue == null) {
+ for (String queueUrl : sqs.listQueues().getQueueUrls()) {
+ boolean found = queueUrl.contains(getName());
+ if (found) {
+ queue = new Queue(queueUrl);
+ break;
+ }
+ }
+ }
+ if(queue == null) {
+ queue = createQueue();
+ }
+ return queue;
+ }
+
+ public void sendMessage(String body){
+ SendMessageRequest request = new SendMessageRequest(getQueue().getUrl(),body);
+ sqs.sendMessage(request);
+ }
+
+ public void sendMessages(List<String> bodies){
+ SendMessageBatchRequest request = new SendMessageBatchRequest(getQueue().getUrl());
+ List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
+ for(String body : bodies){
+ SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
+ entry.setMessageBody(body);
+ entries.add(entry);
+ }
+ request.setEntries(entries);
+ sqs.sendMessageBatch(request);
+ }
+
+ public List<QueueMessage> getMessages( int limit,int timeout){
+ System.out.println("Receiving messages from MyQueue.\n");
+ ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(getQueue().getUrl());
+ receiveMessageRequest.setMaxNumberOfMessages(limit);
+ receiveMessageRequest.setVisibilityTimeout(timeout);
+ List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
+ List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+ for (Message message : messages) {
+ QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),message.getBody());
+ queueMessages.add(queueMessage);
+ }
+ return queueMessages;
+ }
+
+ public void commitMessage( QueueMessage queueMessage){
+ sqs.deleteMessage(new DeleteMessageRequest()
+ .withQueueUrl(getQueue().getUrl())
+ .withReceiptHandle(queueMessage.getHandle()));
+ }
+
+ public void commitMessages( List<QueueMessage> queueMessages){
+ List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
+ for(QueueMessage message : queueMessages){
+ entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
+ }
+ DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(getQueue().getUrl(),entries);
+ DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+ }
+ public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
+
+ private AWSCredentials creds;
+
+ public UsergridAwsCredentialsProvider(){
+ init();
+ }
+
+ private void init() {
+ creds = new AWSCredentials() {
+ @Override
+ public String getAWSAccessKeyId() {
+ return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR));
+ }
+
+ @Override
+ public String getAWSSecretKey() {
+ return StringUtils.trim(System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR));
+ }
+ };
+ if(StringUtils.isEmpty(creds.getAWSAccessKeyId()) || StringUtils.isEmpty(creds.getAWSSecretKey()) ){
+ throw new AmazonClientException("could not retrieve credentials from system properties");
+ }
+ }
+
+ @Override
+ public AWSCredentials getCredentials() {
+ return creds;
+ }
+
+
+ @Override
+ public void refresh() {
+ init();
+ }
+ }
+}