You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/17 02:02:47 UTC
usergrid git commit: Adds strong consistency read to maps. Persists
ES batches into Cassandra for multi region execution.
Repository: usergrid
Updated Branches:
refs/heads/USERGRID-1048 [created] 94a907812
Adds strong consistency read to maps. Persists ES batches into Cassandra for multi region execution.
A bug in wiring JSON to SQS still exists, it's incorrectly escaping some message subtypes.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/94a90781
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/94a90781
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/94a90781
Branch: refs/heads/USERGRID-1048
Commit: 94a9078125fc32d755e33e562f8e8fd8624641c1
Parents: 2b22c61
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 16 18:02:44 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 16 18:02:44 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 186 +++++++++++---
.../asyncevents/AsyncEventService.java | 1 +
.../asyncevents/AsyncIndexProvider.java | 22 +-
.../asyncevents/model/AsyncEvent.java | 3 +-
.../model/ElasticsearchIndexEvent.java | 50 ++++
.../index/IndexProcessorFig.java | 8 +
.../util/ObjectJsonSerializer.java | 74 ++++++
.../index/AmazonAsyncEventServiceTest.java | 6 +-
.../index/AsyncIndexServiceTest.java | 2 +-
.../usergrid/persistence/map/MapManager.java | 25 +-
.../persistence/map/impl/MapManagerImpl.java | 6 +
.../persistence/map/impl/MapSerialization.java | 27 +-
.../map/impl/MapSerializationImpl.java | 248 ++++++++++---------
.../index/impl/DeIndexOperation.java | 4 +
.../persistence/index/impl/IndexOperation.java | 4 +
.../index/impl/IndexOperationMessage.java | 5 +
.../persistence/queue/DefaultQueueManager.java | 12 +-
.../persistence/queue/QueueManager.java | 8 +-
.../queue/impl/SNSQueueManagerImpl.java | 188 ++++++++++----
.../queue/impl/SQSQueueManagerImpl.java | 28 ++-
.../services/queues/ImportQueueManager.java | 9 +-
21 files changed, 666 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 95126c6..c9f0953 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -21,13 +21,20 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.base.Optional;
+
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.exception.NotImplementedException;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +61,13 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueMessage;
@@ -82,12 +94,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
+ private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer( );
+
// SQS maximum receive messages is 10
private static final int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
private final QueueManager queue;
- private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -109,6 +122,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final AtomicLong counter = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle;
+ private final MapManager esMapPersistence;
//the actively running subscription
private List<Subscription> subscriptions = new ArrayList<>();
@@ -123,6 +137,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexLocationStrategyFactory indexLocationStrategyFactory,
final EntityIndexFactory entityIndexFactory,
final EventBuilder eventBuilder,
+ final MapManagerFactory mapManagerFactory,
final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
@@ -130,10 +145,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.eventBuilder = eventBuilder;
+
+ final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");
+
+ this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
this.rxTaskScheduler = rxTaskScheduler;
- this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+ QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
this.queue = queueManagerFactory.getQueueManager(queueScope);
+
this.indexProcessorFig = indexProcessorFig;
this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
@@ -158,7 +179,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
/**
* Offer the EntityIdScope to SQS
*/
- private void offer(final Object operation) {
+ private void offer(final Serializable operation) {
final Timer.Context timer = this.writeTimer.time();
try {
@@ -213,7 +234,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Timer.Context timer = this.ackTimer.time();
try{
- queue.commitMessage(message);
+ queue.commitMessage( message );
//decrement our in-flight counter
inFlight.decrementAndGet();
@@ -235,7 +256,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Timer.Context timer = this.ackTimer.time();
try{
- queue.commitMessages(messages);
+ queue.commitMessages( messages );
//decrement our in-flight counter
inFlight.decrementAndGet();
@@ -296,7 +317,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
handleInitializeApplicationIndex(event, message);
indexoperationObservable = Observable.just(new IndexOperationMessage());
validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- } else {
+ } else if (event instanceof ElasticsearchIndexEvent){
+ handleIndexOperation( (ElasticsearchIndexEvent)event );
+ indexoperationObservable = Observable.just( new IndexOperationMessage() );
+ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
+ }
+
+ else {
throw new Exception("Unknown EventType");//TODO: print json instead
}
@@ -434,6 +461,85 @@ public class AmazonAsyncEventService implements AsyncEventService {
offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
}
+
+ /**
+ * Queue up an indexOperationMessage for multi region execution
+ * @param indexOperationMessage
+ */
+ public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+ final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+
+ final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+ //write to the map in ES
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+
+
+
+ //now queue up the index message
+
+ final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+
+ //send to the topic so all regions index the batch
+ try {
+ queue.sendMessageToTopic( elasticsearchIndexEvent );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to pulish to topic", e );
+ }
+ }
+
+ public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+ Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+ final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+ Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+ //load the entity
+
+ final String message = esMapPersistence.getString( messageId.toString() );
+
+ String highConsistency = null;
+
+ if(message == null){
+ logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+
+ highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
+
+ }
+
+ //read the value from the string
+
+ final IndexOperationMessage indexOperationMessage;
+
+ //our original local read has it, parse it.
+ if(message != null){
+ indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+ }
+ //we tried to read it at a higher consistency level and it works
+ else if (highConsistency != null){
+ indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+ }
+
+ //we couldn't find it, bail
+ else{
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+
+ throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ }
+
+
+
+ //now execute it
+ indexProducer.put(indexOperationMessage).toBlocking().last();
+
+ }
+
+
+
@Override
public long getQueueDepth() {
return queue.getQueueDepth();
@@ -510,71 +616,75 @@ public class AmazonAsyncEventService implements AsyncEventService {
synchronized (mutex) {
Observable<List<QueueMessage>> consumer =
- Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
+ Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
@Override
- public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
+ public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
//name our thread so it's easy to see
- Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet());
+ Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
List<QueueMessage> drainList = null;
do {
try {
- drainList = take().toList().toBlocking().lastOrDefault(null);
+ drainList = take().toList().toBlocking().lastOrDefault( null );
//emit our list in it's entity to hand off to a worker pool
- subscriber.onNext(drainList);
+ subscriber.onNext( drainList );
//take since we're in flight
- inFlight.addAndGet(drainList.size());
- } catch (Throwable t) {
+ inFlight.addAndGet( drainList.size() );
+ }
+ catch ( Throwable t ) {
final long sleepTime = indexProcessorFig.getFailureRetryTime();
- logger.error("Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t);
+ logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );
- if (drainList != null) {
- inFlight.addAndGet(-1 * drainList.size());
+ if ( drainList != null ) {
+ inFlight.addAndGet( -1 * drainList.size() );
}
try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException ie) {
+ Thread.sleep( sleepTime );
+ }
+ catch ( InterruptedException ie ) {
//swallow
}
indexErrorCounter.inc();
}
}
- while (true);
+ while ( true );
}
- })
+ } )
//this won't block our read loop, just reads and proceeds
- .map(messages ->
- {
- if (messages == null || messages.size() == 0) {
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
return null;
}
try {
- List<IndexEventResult> indexEventResults = callEventHandlers(messages);
- List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
- if (messagesToAck == null || messagesToAck.size() == 0) {
- logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages);
+ List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+ List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
+ if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+ logger.error( "No messages came back from the queue operation should have seen "
+ + messages.size(), messages );
return messagesToAck;
}
- if(messagesToAck.size()<messages.size()){
- logger.error("Missing messages from queue post operation",messages,messagesToAck);
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.error( "Missing messages from queue post operation", messages,
+ messagesToAck );
}
//ack each message, but only if we didn't error.
- ack(messagesToAck);
+ ack( messagesToAck );
return messagesToAck;
- } catch (Exception e) {
- logger.error("failed to ack messages to sqs", e);
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to ack messages to sqs", e );
return null;
//do not rethrow so we can process all of them
}
- });
+ } );
//start in the background
@@ -619,12 +729,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
//send the batch
//TODO: should retry?
- try {
- indexProducer.put(combined).toBlocking().lastOrDefault(null);
- }catch (Exception e){
- logger.error("Failed to submit to index producer",e);
- throw e;
- }
+ queueIndexOperationMessage( combined );
+
return messagesToAck;
}
@@ -671,4 +777,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
return creationTime;
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index ae5688c..dcfffcb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index e9e36f0..3865ecb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@ -51,20 +52,18 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
private final IndexProducer indexProducer;
+ private final MapManagerFactory mapManagerFactory;
private AsyncEventService asyncEventService;
@Inject
- public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
- final QueueManagerFactory queueManagerFactory,
- final MetricsFactory metricsFactory,
- final RxTaskScheduler rxTaskScheduler,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EventBuilder eventBuilder,
- final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory,
- final IndexProducer indexProducer) {
+ public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
+ final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
+ EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
+ final MapManagerFactory mapManagerFactory ) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@ -75,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexProducer = indexProducer;
+ this.mapManagerFactory = mapManagerFactory;
}
@@ -99,10 +99,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 6b45297..1af54e3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -39,7 +39,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
@JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
- @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" )
+ @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
+ @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
} )
public abstract class AsyncEvent implements Serializable {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
new file mode 100644
index 0000000..207b15e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * An index event for publishing to elastic search
+ */
+public final class ElasticsearchIndexEvent extends AsyncEvent {
+
+
+ @JsonProperty
+ protected UUID indexBatchId;
+
+ public ElasticsearchIndexEvent() {
+ }
+
+ public ElasticsearchIndexEvent( UUID indexBatchId ) {
+ this.indexBatchId = indexBatchId;
+ }
+
+
+ /**
+ * Get the unique message id of the
+ * @return
+ */
+ public UUID getIndexBatchId() {
+ return indexBatchId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7d022e5..6fd73b4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -103,4 +103,12 @@ public interface IndexProcessorFig extends GuicyFig {
@Default("false")
@Key("elasticsearch.queue_impl.resolution")
boolean resolveSynchronously();
+
+ /**
+ * Get the message TTL in milliseconds
+ * @return
+ */
+ @Default("604800000")
+ @Key( "elasticsearch.message.ttl" )
+ int getIndexMessageTtl();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
new file mode 100644
index 0000000..dbd5ca3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.util;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * An utility class to serialize and de-serialized objects as json strings
+ */
+public final class ObjectJsonSerializer {
+
+
+ private final JsonFactory JSON_FACTORY = new JsonFactory();
+
+ private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+ public ObjectJsonSerializer( ) {
+ MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+ }
+
+
+ public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+
+ Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
+ final String stringValue;
+ //mark this version as empty
+
+ //Convert to internal entity map
+ try {
+ stringValue = MAPPER.writeValueAsString( toSerialize );
+ }
+ catch ( JsonProcessingException jpe ) {
+ throw new RuntimeException( "Unable to serialize entity", jpe );
+ }
+
+ return stringValue;
+ }
+
+
+ public <T extends Serializable> T fromString( final String value, final Class<T> toSerialize ) {
+
+ Preconditions.checkNotNull( value, "value must not be null" );
+
+ try {
+ return MAPPER.readValue( value, toSerialize );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to deserialize", e );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index a14437c..e83d6f8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@ -79,13 +80,16 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Inject
public IndexLocationStrategyFactory indexLocationStrategyFactory;
+ @Inject
+ public MapManagerFactory mapManagerFactory;
+
@Inject
public EntityIndexFactory entityIndexFactory;
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index d34a1a9..2863cbf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -189,7 +189,7 @@ public abstract class AsyncIndexServiceTest {
}
try {
- Thread.sleep( 100 );
+ Thread.sleep( 10000 );
}
catch ( InterruptedException e ) {
//swallow
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index c280fee..80e2d17 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -33,7 +33,14 @@ public interface MapManager {
/**
* Return the string, null if not found
*/
- public String getString( final String key );
+ String getString( final String key );
+
+ /**
+ * Read the string at a high consistency level. This should ensure data replication has happened
+ * @param key
+ * @return
+ */
+ String getStringHighConsistency(final String key);
/**
@@ -41,12 +48,12 @@ public interface MapManager {
* @param keys
* @return
*/
- public Map<String, String> getStrings(final Collection<String> keys);
+ Map<String, String> getStrings( final Collection<String> keys );
/**
* Return the string, null if not found
*/
- public void putString( final String key, final String value );
+ void putString( final String key, final String value );
/**
* The time to live (in seconds) of the string
@@ -54,33 +61,33 @@ public interface MapManager {
* @param value
* @param ttl
*/
- public void putString( final String key, final String value, final int ttl );
+ void putString( final String key, final String value, final int ttl );
/**
* Return the uuid, null if not found
*/
- public UUID getUuid( final String key );
+ UUID getUuid( final String key );
/**
* Return the uuid, null if not found
*/
- public void putUuid( final String key, final UUID putUuid );
+ void putUuid( final String key, final UUID putUuid );
/**
* Return the long, null if not found
*/
- public Long getLong( final String key );
+ Long getLong( final String key );
/**
* Return the long, null if not found
*/
- public void putLong( final String key, final Long value );
+ void putLong( final String key, final Long value );
/**
* Delete the key
*
* @param key The key used to delete the entry
*/
- public void delete( final String key );
+ void delete( final String key );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index fb2e7ff..501ade7 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -53,6 +53,12 @@ public class MapManagerImpl implements MapManager {
@Override
+ public String getStringHighConsistency( final String key ) {
+ return mapSerialization.getStringHighConsistency(scope, key);
+ }
+
+
+ @Override
public Map<String, String> getStrings( final Collection<String> keys ) {
return mapSerialization.getStrings( scope, keys );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 2e958c2..e9c21d2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -32,50 +32,59 @@ public interface MapSerialization extends Migration {
/**
* Return the string, null if not found
*/
- public String getString( final MapScope scope, final String key );
+ String getString( final MapScope scope, final String key );
+
+
+ /**
+ * Get the key from all regions with a high consistency
+ * @param scope
+ * @param key
+ * @return
+ */
+ String getStringHighConsistency( final MapScope scope, final String key );
/**
* Get strings from the map
* @param keys
* @return
*/
- public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+ Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
/**
* Return the string, null if not found
*/
- public void putString( final MapScope scope, final String key, final String value );
+ void putString( final MapScope scope, final String key, final String value );
/**
* Write the string
*/
- public void putString( final MapScope scope, final String key, final String value, final int ttl );
+ void putString( final MapScope scope, final String key, final String value, final int ttl );
/**
* Return the uuid, null if not found
*/
- public UUID getUuid( final MapScope scope, final String key );
+ UUID getUuid( final MapScope scope, final String key );
/**
* Return the uuid, null if not found
*/
- public void putUuid( final MapScope scope, final String key, final UUID putUuid );
+ void putUuid( final MapScope scope, final String key, final UUID putUuid );
/**
* Return the long, null if not found
*/
- public Long getLong( final MapScope scope, final String key );
+ Long getLong( final MapScope scope, final String key );
/**
* Return the long, null if not found
*/
- public void putLong( final MapScope scope, final String key, final Long value );
+ void putLong( final MapScope scope, final String key, final Long value );
/**
* Delete the key
*
* @param key The key used to delete the entry
*/
- public void delete( final MapScope scope, final String key );
+ void delete( final MapScope scope, final String key );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 825d636..1aa3229 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.usergrid.persistence.map.impl;
+
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -26,21 +28,21 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import com.google.common.base.Preconditions;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
import org.apache.usergrid.persistence.map.MapScope;
+import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
import com.google.inject.Inject;
@@ -53,9 +55,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
-import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.serializers.BooleanSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
@@ -65,41 +67,40 @@ public class MapSerializationImpl implements MapSerialization {
private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
- private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
- new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
+ private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
+ new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
- private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
- private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
- new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
+ private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
+ private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
+ new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
- private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
+ private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
- private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
+ private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
- /**
- * CFs where the row key contains the source node id
- */
- public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean>
- MAP_ENTRIES = new MultiTennantColumnFamily<>(
- "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
+ /**
+ * CFs where the row key contains the source node id
+ */
+ public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> MAP_ENTRIES =
+ new MultiTennantColumnFamily<>( "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
- /**
- * CFs where the row key contains the source node id
- */
- public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
- new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
+ /**
+ * CFs where the row key contains the source node id
+ */
+ public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
+ new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
/**
* Number of buckets to hash across.
*/
- private static final int[] NUM_BUCKETS = {20};
+ private static final int[] NUM_BUCKETS = { 20 };
/**
* How to funnel keys for buckets
@@ -107,7 +108,6 @@ public class MapSerializationImpl implements MapSerialization {
private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
-
@Override
public void funnel( final String key, final PrimitiveSink into ) {
into.putString( key, StringHashUtils.UTF8 );
@@ -117,8 +117,8 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Locator to get us all buckets
*/
- private static final ExpandingShardLocator<String>
- BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
+ private static final ExpandingShardLocator<String> BUCKET_LOCATOR =
+ new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
private final Keyspace keyspace;
@@ -129,13 +129,20 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public String getString( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue(scope, key); // TODO: why boolean?
- return (col !=null) ? col.getStringValue(): null;
+ Column<Boolean> col = getValue( scope, key );
+ return ( col != null ) ? col.getStringValue() : null;
}
@Override
- public Map<String, String> getStrings(final MapScope scope, final Collection<String> keys ) {
+ public String getStringHighConsistency( final MapScope scope, final String key ) {
+ Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean?
+ return ( col != null ) ? col.getStringValue() : null;
+ }
+
+
+ @Override
+ public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) {
return getValues( scope, keys, STRING_RESULTS_BUILDER );
}
@@ -144,13 +151,13 @@ public class MapSerializationImpl implements MapSerialization {
public void putString( final MapScope scope, final String key, final String value ) {
final RowOp op = new RowOp() {
@Override
- public void putValue(final ColumnListMutation<Boolean> columnListMutation ) {
+ public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value );
}
@Override
- public void putKey(final ColumnListMutation<String> keysMutation ) {
+ public void putKey( final ColumnListMutation<String> keysMutation ) {
keysMutation.putColumn( key, true );
}
};
@@ -184,10 +191,6 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Write our string index with the specified row op
- * @param scope
- * @param key
- * @param value
- * @param rowOp
*/
private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
@@ -225,10 +228,11 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Callbacks for performing row operations
*/
- private static interface RowOp{
+ private static interface RowOp {
/**
* Callback to do the row
+ *
* @param columnListMutation The column mutation
*/
void putValue( final ColumnListMutation<Boolean> columnListMutation );
@@ -236,104 +240,97 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Write the key
- * @param keysMutation
*/
void putKey( final ColumnListMutation<String> keysMutation );
-
-
}
+
@Override
public UUID getUuid( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue(scope, key);
- return (col !=null) ? col.getUUIDValue(): null;
+ Column<Boolean> col = getValue( scope, key );
+ return ( col != null ) ? col.getUUIDValue() : null;
}
@Override
public void putUuid( final MapScope scope, final String key, final UUID putUuid ) {
- Preconditions.checkNotNull(scope, "mapscope is required");
+ Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( putUuid, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
- batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, putUuid);
+ batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
- final BucketScopedRowKey< String> keyRowKey =
- BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+ final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
- batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
-
- executeBatch(batch);
+ batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
+ executeBatch( batch );
}
@Override
public Long getLong( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue(scope, key);
- return (col !=null) ? col.getLongValue(): null;
+ Column<Boolean> col = getValue( scope, key );
+ return ( col != null ) ? col.getLongValue() : null;
}
-
-
@Override
public void putLong( final MapScope scope, final String key, final Long value ) {
- Preconditions.checkNotNull(scope, "mapscope is required");
+ Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
- batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, value);
+ batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
- final BucketScopedRowKey< String> keyRowKey =
- BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+ final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
- batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
+ batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
- executeBatch(batch);
+ executeBatch( batch );
}
@Override
public void delete( final MapScope scope, final String key ) {
final MutationBatch batch = keyspace.prepareMutationBatch();
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
- batch.withRow(MAP_ENTRIES, entryRowKey).delete();
+ batch.withRow( MAP_ENTRIES, entryRowKey ).delete();
//add it to the keys, we're not sure which one it may have come from
- final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
+ final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
- final List<BucketScopedRowKey<String>>
- rowKeys = BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
+ final List<BucketScopedRowKey<String>> rowKeys =
+ BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
- for(BucketScopedRowKey<String> rowKey: rowKeys) {
+ for ( BucketScopedRowKey<String> rowKey : rowKeys ) {
batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
}
@@ -345,34 +342,53 @@ public class MapSerializationImpl implements MapSerialization {
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
final MultiTennantColumnFamilyDefinition mapEntries =
- new MultiTennantColumnFamilyDefinition( MAP_ENTRIES,
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+ new MultiTennantColumnFamilyDefinition( MAP_ENTRIES, BytesType.class.getSimpleName(),
+ BytesType.class.getSimpleName(), BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
final MultiTennantColumnFamilyDefinition mapKeys =
- new MultiTennantColumnFamilyDefinition( MAP_KEYS,
- BytesType.class.getSimpleName(),
- UTF8Type.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+ new MultiTennantColumnFamilyDefinition( MAP_KEYS, BytesType.class.getSimpleName(),
+ UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
return Arrays.asList( mapEntries, mapKeys );
}
- private Column<Boolean> getValue(MapScope scope, String key) {
+ private Column<Boolean> getValue( MapScope scope, String key ) {
+
+
+ //add it to the entry
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+
+ //now get all columns, including the "old row key value"
+ try {
+ final Column<Boolean> result =
+ keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
+
+ return result;
+ }
+ catch ( NotFoundException nfe ) {
+ //nothing to return
+ return null;
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+ }
+
+ private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) {
//add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//now get all columns, including the "old row key value"
try {
- final Column<Boolean> result = keyspace.prepareQuery( MAP_ENTRIES )
- .getKey( entryRowKey ).getColumn( true ).execute().getResult();
+ final Column<Boolean> result =
+ keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM )
+ .getKey( entryRowKey ).getColumn( true ).execute().getResult();
return result;
}
@@ -388,52 +404,45 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Get multiple values, using the string builder
- * @param scope
- * @param keys
- * @param builder
- * @param <T>
- * @return
*/
- private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+ private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) {
final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
- for(final String key: keys){
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ for ( final String key : keys ) {
+ //add it to the entry
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
rowKeys.add( entryRowKey );
-
}
+ //now get all columns, including the "old row key value"
+ try {
+ final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
+ keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute()
+ .getResult();
- //now get all columns, including the "old row key value"
- try {
- final Rows<ScopedRowKey<MapEntryKey>, Boolean>
- rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
- .execute().getResult();
-
-
- return builder.buildResults( rows );
- }
- catch ( NotFoundException nfe ) {
- //nothing to return
- return null;
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
- }
+ return builder.buildResults( rows );
+ }
+ catch ( NotFoundException nfe ) {
+ //nothing to return
+ return null;
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+ }
- private void executeBatch(MutationBatch batch) {
+ private void executeBatch( MutationBatch batch ) {
try {
batch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to connect to cassandra", e);
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
}
}
@@ -501,8 +510,7 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Create a scoped row key from the key
*/
- public static ScopedRowKey<MapEntryKey> fromKey(
- final MapScope mapScope, final String key ) {
+ public static ScopedRowKey<MapEntryKey> fromKey( final MapScope mapScope, final String key ) {
return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
}
@@ -511,32 +519,32 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Build the results from the row keys
- * @param <T>
*/
private static interface ResultsBuilder<T> {
- public T buildResults(final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+ public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows );
}
- public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+ public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> {
@Override
public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
final int size = rows.size();
- final Map<String, String> results = new HashMap<>(size);
+ final Map<String, String> results = new HashMap<>( size );
- for(int i = 0; i < size; i ++){
+ for ( int i = 0; i < size; i++ ) {
final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
final String value = row.getColumns().getStringValue( true, null );
- if(value == null){
+ if ( value == null ) {
continue;
}
- results.put( row.getKey().getKey().key, value );
+ results.put( row.getKey().getKey().key, value );
}
return results;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index 4f47749..4060dac 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
@@ -42,7 +43,10 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createInd
@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
public class DeIndexOperation implements BatchOperation {
+ @JsonProperty
private String[] indexes;
+
+ @JsonProperty
private String documentId;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
index fae809f..28f2e0d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
@@ -30,6 +30,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
+import com.fasterxml.jackson.annotation.JsonProperty;
/**
@@ -37,9 +38,12 @@ import org.elasticsearch.client.Client;
*/
public class IndexOperation implements BatchOperation {
+ @JsonProperty
public String writeAlias;
+ @JsonProperty
public String documentId;
+ @JsonProperty
public Map<String, Object> data;
public IndexOperation( final String writeAlias, final ApplicationScope applicationScope, IndexEdge indexEdge,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 12df390..bcee308 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
@@ -33,9 +34,13 @@ import com.google.common.base.Optional;
* Container for index operations.
*/
public class IndexOperationMessage implements Serializable {
+ @JsonProperty
private final Set<IndexOperation> indexRequests;
+
+ @JsonProperty
private final Set<DeIndexOperation> deIndexRequests;
+ @JsonProperty
private long creationTime;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index d974529..5201279 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.queue;
import rx.Observable;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -67,12 +68,21 @@ public class DefaultQueueManager implements QueueManager {
}
}
+
@Override
- public synchronized void sendMessage(Object body) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
String uuid = UUID.randomUUID().toString();
queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
+
}
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+ sendMessage( body );
+ }
+
+
@Override
public void deleteQueue() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 027abb2..dc3d1b5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,13 @@ public interface QueueManager {
* @param body
* @throws IOException
*/
- void sendMessage(Object body)throws IOException;
+ <T extends Serializable> void sendMessage(T body)throws IOException;
+
+ /**
+ * Send a messae to the topic to be sent to other queues
+ * @param body
+ */
+ <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
/**
* purge messages
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index d476f76..59ecd24 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -18,15 +18,55 @@
package org.apache.usergrid.persistence.queue.impl;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.*;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.SubscribeRequest;
+import com.amazonaws.services.sns.model.SubscribeResult;
+import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.DeleteQueueRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
+import com.amazonaws.services.sqs.model.SendMessageResult;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,20 +76,6 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
public class SNSQueueManagerImpl implements QueueManager {
@@ -59,10 +85,10 @@ public class SNSQueueManagerImpl implements QueueManager {
private final QueueFig fig;
private final ClusterFig clusterFig;
private final CassandraFig cassandraFig;
- private final QueueFig queueFig;
private final AmazonSQSClient sqs;
private final AmazonSNSClient sns;
private final AmazonSNSAsyncClient snsAsync;
+ private final AmazonSQSAsyncClient sqsAsync;
private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -110,6 +136,7 @@ public class SNSQueueManagerImpl implements QueueManager {
});
+
@Inject
public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
CassandraFig cassandraFig, QueueFig queueFig) {
@@ -117,12 +144,21 @@ public class SNSQueueManagerImpl implements QueueManager {
this.fig = fig;
this.clusterFig = clusterFig;
this.cassandraFig = cassandraFig;
- this.queueFig = queueFig;
+
+
+ // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+ final ExecutorService executor = TaskExecutorFactory
+ .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+ TaskExecutorFactory.RejectionAction.CALLERRUNS);
+
+
+ final Region region = getRegion();
try {
- sqs = createSQSClient(getRegion());
- sns = createSNSClient(getRegion());
- snsAsync = createAsyncSNSClient(getRegion());
+ sqs = createSQSClient(region);
+ sns = createSNSClient(region);
+ snsAsync = createAsyncSNSClient(region, executor);
+ sqsAsync = createAsyncSQSClient( region, executor );
} catch (Exception e) {
throw new RuntimeException("Error setting up mapper", e);
@@ -157,7 +193,7 @@ public class SNSQueueManagerImpl implements QueueManager {
try {
SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
- sns.subscribe(primarySubscribeRequest);
+ sns.subscribe(primarySubscribeRequest);
// ensure the SNS primary topic has permission to send to the primary SQS queue
List<String> primaryTopicArnList = new ArrayList<>();
@@ -276,22 +312,35 @@ public class SNSQueueManagerImpl implements QueueManager {
*
*/
- private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
+ private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
- final Executor executor = TaskExecutorFactory
- .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
- TaskExecutorFactory.RejectionAction.CALLERRUNS);
-
- final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor);
+ final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
sns.setRegion(region);
return sns;
}
+
+ /**
+ * Create the async sqs client
+ * @param region
+ * @param executor
+ * @return
+ */
+ private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+ final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
+
+ sqs.setRegion( region );
+
+ return sqs;
+
+ }
+
/**
* The Synchronous SNS client is used for creating topics and subscribing queues.
*
@@ -369,7 +418,12 @@ public class SNSQueueManagerImpl implements QueueManager {
try {
final JsonNode bodyNode = mapper.readTree(message.getBody());
JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
- body = fromString(bodyObj.textValue(), klass);
+
+
+
+ final String bodyText = mapper.writeValueAsString( bodyObj );;
+
+ body = fromString(bodyText, klass);
} catch (Exception e) {
logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
throw new RuntimeException(e);
@@ -405,6 +459,40 @@ public class SNSQueueManagerImpl implements QueueManager {
}
}
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+ if (snsAsync == null) {
+ logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ return;
+ }
+
+ final String stringBody = toString(body);
+
+ String topicArn = getWriteTopicArn();
+
+ if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+ PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+
+ snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ @Override
+ public void onError( Exception e ) {
+ logger.error( "Error publishing message... {}", e );
+ }
+
+
+ @Override
+ public void onSuccess( PublishRequest request, PublishResult result ) {
+ if ( logger.isDebugEnabled() ) logger
+ .debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
+ request.getTopicArn() );
+ }
+ } );
+
+ }
+
+
@Override
public void sendMessages(final List bodies) throws IOException {
@@ -414,41 +502,47 @@ public class SNSQueueManagerImpl implements QueueManager {
}
for (Object body : bodies) {
- sendMessage(body);
+ sendMessage((Serializable)body);
}
}
+
@Override
- public void sendMessage(final Object body) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
- if (snsAsync == null) {
- logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ if ( snsAsync == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
return;
}
- final String stringBody = toString(body);
+ final String stringBody = toString( body );
- String topicArn = getWriteTopicArn();
+ String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Publishing Message...{} to url: {}", stringBody, url );
+ }
- PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+ SendMessageRequest request = new SendMessageRequest( url, stringBody );
- snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
- @Override
- public void onError(Exception e) {
- logger.error("Error publishing message... {}", e);
- }
+ sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
@Override
- public void onSuccess(PublishRequest request, PublishResult result) {
- if (logger.isDebugEnabled())
- logger.debug("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), request.getTopicArn());
+ public void onError( final Exception e ) {
+ logger.error( "Error sending message... {}", e );
}
- });
+
+ @Override
+ public void onSuccess( final SendMessageRequest request, final SendMessageResult sendMessageResult ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Successfully send... messageBody=[{}], url=[{}]", request.getMessageBody(),
+ request.getQueueUrl() );
+ }
+ }
+ } );
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index fa9a7ac..0c56c05 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.queue.impl;
import java.io.IOException;
+import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -243,25 +244,34 @@ public class SQSQueueManagerImpl implements QueueManager {
}
+
@Override
- public void sendMessage(final Object body) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
if (sqs == null) {
- logger.error("Sqs is null");
- return;
- }
+ logger.error("Sqs is null");
+ return;
+ }
- String url = getQueue().getUrl();
+ String url = getQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
+ if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
- final String stringBody = toString(body);
+ final String stringBody = toString(body);
- SendMessageRequest request = new SendMessageRequest(url, stringBody);
- sqs.sendMessage(request);
+ SendMessageRequest request = new SendMessageRequest(url, stringBody);
+ sqs.sendMessage(request);
+ }
+
+
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+ sendMessage( body );
}
+
@Override
public void commitMessage(final QueueMessage queueMessage) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index bca9a49..bc55ff4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.services.queues;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import org.apache.usergrid.persistence.queue.QueueManager;
@@ -65,7 +66,13 @@ public class ImportQueueManager implements QueueManager {
@Override
- public void sendMessage( final Object body ) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
+
+ }
+
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
}