You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/11 14:48:09 UTC
[1/3] incubator-usergrid git commit: Fixes ttl on map entry keys
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-473 c021d5d6a -> 0aeaa8815 (forced update)
Fixes ttl on map entry keys
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/06e7ad6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/06e7ad6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/06e7ad6c
Branch: refs/heads/USERGRID-473
Commit: 06e7ad6c059ed295238c24a2610e2047bf79c0d4
Parents: 5d95ebb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 12:19:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 12:19:23 2015 -0600
----------------------------------------------------------------------
.../map/impl/MapSerializationImpl.java | 37 +++++++++++++++-----
1 file changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/06e7ad6c/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index d3bd3c5..715c202 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -129,10 +129,15 @@ public class MapSerializationImpl implements MapSerialization {
public void putString( final MapScope scope, final String key, final String value ) {
final RowOp op = new RowOp() {
@Override
- public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
- final ColumnListMutation<Boolean> columnListMutation ) {
+ public void putValue(final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value );
}
+
+
+ @Override
+ public void putKey(final ColumnListMutation<String> keysMutation ) {
+ keysMutation.putColumn( key, true );
+ }
};
@@ -146,10 +151,15 @@ public class MapSerializationImpl implements MapSerialization {
final RowOp op = new RowOp() {
@Override
- public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
- final ColumnListMutation<Boolean> columnListMutation ) {
+ public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value, ttl );
}
+
+
+ @Override
+ public void putKey( final ColumnListMutation<String> keysMutation ) {
+ keysMutation.putColumn( key, true, ttl );
+ }
};
@@ -179,7 +189,7 @@ public class MapSerializationImpl implements MapSerialization {
// entry
- rowOp.rowOp( entryRowKey, batch.withRow( MAP_ENTRIES, entryRowKey ) );
+ rowOp.putValue( batch.withRow( MAP_ENTRIES, entryRowKey ) );
//add it to the keys
@@ -189,20 +199,31 @@ public class MapSerializationImpl implements MapSerialization {
final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
- batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
+
+ rowOp.putKey( batch.withRow( MAP_KEYS, keyRowKey ) );
executeBatch( batch );
}
+
+ /**
+ * Callbacks for performing row operations
+ */
private static interface RowOp{
/**
* Callback to do the row
- * @param scopedRowKey The row key
* @param columnListMutation The column mutation
*/
- void rowOp(final ScopedRowKey<MapEntryKey> scopedRowKey, final ColumnListMutation<Boolean> columnListMutation);
+ void putValue( final ColumnListMutation<Boolean> columnListMutation );
+
+
+ /**
+ * Write the key
+ * @param keysMutation
+ */
+ void putKey( final ColumnListMutation<String> keysMutation );
}
[2/3] incubator-usergrid git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o
Posted by sf...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3bd96c9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3bd96c9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3bd96c9a
Branch: refs/heads/USERGRID-473
Commit: 3bd96c9a0c1220905c315514c4d8d5d250f29f62
Parents: ce14542 06e7ad6
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Mar 11 07:44:47 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Mar 11 07:44:47 2015 -0600
----------------------------------------------------------------------
.../map/impl/MapSerializationImpl.java | 37 +++++++++++++++-----
1 file changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[3/3] incubator-usergrid git commit: adding dead letter
Posted by sf...@apache.org.
adding dead letter
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0aeaa881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0aeaa881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0aeaa881
Branch: refs/heads/USERGRID-473
Commit: 0aeaa881556fa85d330fb877deca522c0b28e2f5
Parents: 3bd96c9
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Mar 11 07:47:57 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Mar 11 07:47:57 2015 -0600
----------------------------------------------------------------------
.../queue/impl/SQSQueueManagerImpl.java | 84 ++++++++++++--------
.../persistence/queue/QueueManagerTest.java | 2 -
2 files changed, 52 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0aeaa881/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..cfaf0b1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -36,12 +36,11 @@ import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -55,33 +54,49 @@ public class SQSQueueManagerImpl implements QueueManager {
private static SmileFactory smileFactory = new SmileFactory();
private static LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<SqsLoader, Queue>() {
- @Override
- public Queue load(SqsLoader queueLoader) throws Exception {
- Queue queue = null;
- try {
- GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
- queue = new Queue(result.getQueueUrl());
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
- queue = null;
- } catch (Exception e) {
- LOG.error("failed to get queue from service", e);
- throw e;
- }
- if (queue == null) {
- String name = queueLoader.getKey();
- CreateQueueRequest createQueueRequest = new CreateQueueRequest()
- .withQueueName(name);
- CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
- String url = result.getQueueUrl();
- queue = new Queue(url);
- LOG.info("Created queue with url {}", url);
- }
- return queue;
+ .maximumSize(1000)
+ .build(new CacheLoader<SqsLoader, Queue>() {
+ @Override
+ public Queue load(SqsLoader queueLoader) throws Exception {
+ Queue queue = null;
+ try {
+ GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
+ queue = new Queue(result.getQueueUrl());
+ } catch (QueueDoesNotExistException queueDoesNotExistException) {
+ queue = null;
+ } catch (Exception e) {
+ LOG.error("failed to get queue from service", e);
+ throw e;
}
+ if (queue == null) {
+ String name = queueLoader.getKey();
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest()
+ .withQueueName(name);
+ CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
+ String queueUrl = result.getQueueUrl();
+
+ setDeadLetterQueue(queueLoader.client,queueLoader.config(), queueUrl, name+"_dead");
+ queue = new Queue(queueUrl);
+ LOG.info("Created queue with url {}", queueUrl);
+ }
+ return queue;
}
- );
+ }
+ );
+
+ private static void setDeadLetterQueue(AmazonSQSClient client, QueueFig fig, String queueUrl, String deadLetterName) {
+ CreateQueueRequest deadLetterQueueRequest = new CreateQueueRequest()
+ .withQueueName(deadLetterName);
+ CreateQueueResult deadLetterResult = client.createQueue(deadLetterQueueRequest);
+ String deadLetterUrl = deadLetterResult.getQueueUrl();
+ String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\", \"deadLetterTargetArn\":\"%s\"}", fig.getMaxReceiveCount(), deadLetterUrl);
+ SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("RedrivePolicy", redrivePolicy);
+ queueAttributes.setAttributes(attributes);
+ queueAttributes.setQueueUrl(queueUrl);
+ client.setQueueAttributes(queueAttributes);
+ }
@Inject
public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
@@ -111,7 +126,7 @@ public class SQSQueueManagerImpl implements QueueManager {
public Queue getQueue() {
try {
- Queue queue = urlMap.get(new SqsLoader(getName(),sqs));
+ Queue queue = urlMap.get(new SqsLoader(getName(),sqs,fig));
return queue;
} catch (ExecutionException ee) {
throw new RuntimeException(ee);
@@ -127,6 +142,7 @@ public class SQSQueueManagerImpl implements QueueManager {
waitTime = waitTime/1000;
String url = getQueue().getUrl();
LOG.info("Getting {} messages from {}", limit, url);
+
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
receiveMessageRequest.setMaxNumberOfMessages(limit);
receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
@@ -191,8 +207,8 @@ public class SQSQueueManagerImpl implements QueueManager {
LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(url)
- .withReceiptHandle(queueMessage.getHandle()));
+ .withQueueUrl(url)
+ .withReceiptHandle(queueMessage.getHandle()));
}
@@ -230,10 +246,12 @@ public class SQSQueueManagerImpl implements QueueManager {
public class SqsLoader {
private final String key;
private final AmazonSQSClient client;
+ private final QueueFig fig;
- public SqsLoader(String key, AmazonSQSClient client) {
+ public SqsLoader(String key, AmazonSQSClient client,QueueFig fig) {
this.key = key;
this.client = client;
+ this.fig = fig;
}
public AmazonSQSClient getClient() {
@@ -265,5 +283,7 @@ public class SQSQueueManagerImpl implements QueueManager {
return getKey();
}
+ public QueueFig config(){return fig;}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0aeaa881/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 935fd16..89fb9cd 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -76,7 +76,6 @@ public class QueueManagerTest {
assertEquals(scope.getApplication().getUuid(),uuid);
}
- @Ignore("need aws creds")
@Test
public void send() throws IOException,ClassNotFoundException{
String value = "bodytest";
@@ -92,7 +91,6 @@ public class QueueManagerTest {
}
- @Ignore("need aws creds")
@Test
public void sendMore() throws IOException,ClassNotFoundException{
HashMap<String,String> values = new HashMap<>();