You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/20 20:02:04 UTC
[01/12] usergrid git commit: Adds strong consistency read to maps.
Persists ES batches into Cassandra for multi region execution.
Repository: usergrid
Updated Branches:
refs/heads/2.1-release a09485a3a -> 1fe1d1a34
Adds strong consistency read to maps. Persists ES batches into Cassandra for multi region execution.
A bug in wiring JSON to SQS still exists, it's incorrectly escaping some message subtypes.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/94a90781
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/94a90781
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/94a90781
Branch: refs/heads/2.1-release
Commit: 94a9078125fc32d755e33e562f8e8fd8624641c1
Parents: 2b22c61
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 16 18:02:44 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 16 18:02:44 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 186 +++++++++++---
.../asyncevents/AsyncEventService.java | 1 +
.../asyncevents/AsyncIndexProvider.java | 22 +-
.../asyncevents/model/AsyncEvent.java | 3 +-
.../model/ElasticsearchIndexEvent.java | 50 ++++
.../index/IndexProcessorFig.java | 8 +
.../util/ObjectJsonSerializer.java | 74 ++++++
.../index/AmazonAsyncEventServiceTest.java | 6 +-
.../index/AsyncIndexServiceTest.java | 2 +-
.../usergrid/persistence/map/MapManager.java | 25 +-
.../persistence/map/impl/MapManagerImpl.java | 6 +
.../persistence/map/impl/MapSerialization.java | 27 +-
.../map/impl/MapSerializationImpl.java | 248 ++++++++++---------
.../index/impl/DeIndexOperation.java | 4 +
.../persistence/index/impl/IndexOperation.java | 4 +
.../index/impl/IndexOperationMessage.java | 5 +
.../persistence/queue/DefaultQueueManager.java | 12 +-
.../persistence/queue/QueueManager.java | 8 +-
.../queue/impl/SNSQueueManagerImpl.java | 188 ++++++++++----
.../queue/impl/SQSQueueManagerImpl.java | 28 ++-
.../services/queues/ImportQueueManager.java | 9 +-
21 files changed, 666 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 95126c6..c9f0953 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -21,13 +21,20 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.base.Optional;
+
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.exception.NotImplementedException;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +61,13 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueMessage;
@@ -82,12 +94,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
+ private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer( );
+
// SQS maximum receive messages is 10
private static final int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
private final QueueManager queue;
- private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -109,6 +122,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final AtomicLong counter = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle;
+ private final MapManager esMapPersistence;
//the actively running subscription
private List<Subscription> subscriptions = new ArrayList<>();
@@ -123,6 +137,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexLocationStrategyFactory indexLocationStrategyFactory,
final EntityIndexFactory entityIndexFactory,
final EventBuilder eventBuilder,
+ final MapManagerFactory mapManagerFactory,
final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
@@ -130,10 +145,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.eventBuilder = eventBuilder;
+
+ final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");
+
+ this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
this.rxTaskScheduler = rxTaskScheduler;
- this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+ QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
this.queue = queueManagerFactory.getQueueManager(queueScope);
+
this.indexProcessorFig = indexProcessorFig;
this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
@@ -158,7 +179,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
/**
* Offer the EntityIdScope to SQS
*/
- private void offer(final Object operation) {
+ private void offer(final Serializable operation) {
final Timer.Context timer = this.writeTimer.time();
try {
@@ -213,7 +234,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Timer.Context timer = this.ackTimer.time();
try{
- queue.commitMessage(message);
+ queue.commitMessage( message );
//decrement our in-flight counter
inFlight.decrementAndGet();
@@ -235,7 +256,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Timer.Context timer = this.ackTimer.time();
try{
- queue.commitMessages(messages);
+ queue.commitMessages( messages );
//decrement our in-flight counter
inFlight.decrementAndGet();
@@ -296,7 +317,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
handleInitializeApplicationIndex(event, message);
indexoperationObservable = Observable.just(new IndexOperationMessage());
validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- } else {
+ } else if (event instanceof ElasticsearchIndexEvent){
+ handleIndexOperation( (ElasticsearchIndexEvent)event );
+ indexoperationObservable = Observable.just( new IndexOperationMessage() );
+ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
+ }
+
+ else {
throw new Exception("Unknown EventType");//TODO: print json instead
}
@@ -434,6 +461,85 @@ public class AmazonAsyncEventService implements AsyncEventService {
offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
}
+
+ /**
+ * Queue up an indexOperationMessage for multi region execution
+ * @param indexOperationMessage
+ */
+ public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+ final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+
+ final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+ //write to the map in ES
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+
+
+
+ //now queue up the index message
+
+ final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+
+ //send to the topic so all regions index the batch
+ try {
+ queue.sendMessageToTopic( elasticsearchIndexEvent );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to pulish to topic", e );
+ }
+ }
+
+ public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+ Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+ final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+ Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+ //load the entity
+
+ final String message = esMapPersistence.getString( messageId.toString() );
+
+ String highConsistency = null;
+
+ if(message == null){
+ logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+
+ highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
+
+ }
+
+ //read the value from the string
+
+ final IndexOperationMessage indexOperationMessage;
+
+ //our original local read has it, parse it.
+ if(message != null){
+ indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+ }
+ //we tried to read it at a higher consistency level and it works
+ else if (highConsistency != null){
+ indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+ }
+
+ //we couldn't find it, bail
+ else{
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+
+ throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ }
+
+
+
+ //now execute it
+ indexProducer.put(indexOperationMessage).toBlocking().last();
+
+ }
+
+
+
@Override
public long getQueueDepth() {
return queue.getQueueDepth();
@@ -510,71 +616,75 @@ public class AmazonAsyncEventService implements AsyncEventService {
synchronized (mutex) {
Observable<List<QueueMessage>> consumer =
- Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
+ Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
@Override
- public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
+ public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
//name our thread so it's easy to see
- Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet());
+ Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
List<QueueMessage> drainList = null;
do {
try {
- drainList = take().toList().toBlocking().lastOrDefault(null);
+ drainList = take().toList().toBlocking().lastOrDefault( null );
//emit our list in it's entity to hand off to a worker pool
- subscriber.onNext(drainList);
+ subscriber.onNext( drainList );
//take since we're in flight
- inFlight.addAndGet(drainList.size());
- } catch (Throwable t) {
+ inFlight.addAndGet( drainList.size() );
+ }
+ catch ( Throwable t ) {
final long sleepTime = indexProcessorFig.getFailureRetryTime();
- logger.error("Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t);
+ logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );
- if (drainList != null) {
- inFlight.addAndGet(-1 * drainList.size());
+ if ( drainList != null ) {
+ inFlight.addAndGet( -1 * drainList.size() );
}
try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException ie) {
+ Thread.sleep( sleepTime );
+ }
+ catch ( InterruptedException ie ) {
//swallow
}
indexErrorCounter.inc();
}
}
- while (true);
+ while ( true );
}
- })
+ } )
//this won't block our read loop, just reads and proceeds
- .map(messages ->
- {
- if (messages == null || messages.size() == 0) {
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
return null;
}
try {
- List<IndexEventResult> indexEventResults = callEventHandlers(messages);
- List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
- if (messagesToAck == null || messagesToAck.size() == 0) {
- logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages);
+ List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+ List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
+ if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+ logger.error( "No messages came back from the queue operation should have seen "
+ + messages.size(), messages );
return messagesToAck;
}
- if(messagesToAck.size()<messages.size()){
- logger.error("Missing messages from queue post operation",messages,messagesToAck);
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.error( "Missing messages from queue post operation", messages,
+ messagesToAck );
}
//ack each message, but only if we didn't error.
- ack(messagesToAck);
+ ack( messagesToAck );
return messagesToAck;
- } catch (Exception e) {
- logger.error("failed to ack messages to sqs", e);
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to ack messages to sqs", e );
return null;
//do not rethrow so we can process all of them
}
- });
+ } );
//start in the background
@@ -619,12 +729,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
//send the batch
//TODO: should retry?
- try {
- indexProducer.put(combined).toBlocking().lastOrDefault(null);
- }catch (Exception e){
- logger.error("Failed to submit to index producer",e);
- throw e;
- }
+ queueIndexOperationMessage( combined );
+
return messagesToAck;
}
@@ -671,4 +777,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
return creationTime;
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index ae5688c..dcfffcb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index e9e36f0..3865ecb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@ -51,20 +52,18 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
private final IndexProducer indexProducer;
+ private final MapManagerFactory mapManagerFactory;
private AsyncEventService asyncEventService;
@Inject
- public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
- final QueueManagerFactory queueManagerFactory,
- final MetricsFactory metricsFactory,
- final RxTaskScheduler rxTaskScheduler,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EventBuilder eventBuilder,
- final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory,
- final IndexProducer indexProducer) {
+ public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
+ final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
+ EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
+ final MapManagerFactory mapManagerFactory ) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@ -75,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexProducer = indexProducer;
+ this.mapManagerFactory = mapManagerFactory;
}
@@ -99,10 +99,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 6b45297..1af54e3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -39,7 +39,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
@JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
- @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" )
+ @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
+ @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
} )
public abstract class AsyncEvent implements Serializable {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
new file mode 100644
index 0000000..207b15e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * An index event for publishing to elastic search
+ */
+public final class ElasticsearchIndexEvent extends AsyncEvent {
+
+
+ @JsonProperty
+ protected UUID indexBatchId;
+
+ public ElasticsearchIndexEvent() {
+ }
+
+ public ElasticsearchIndexEvent( UUID indexBatchId ) {
+ this.indexBatchId = indexBatchId;
+ }
+
+
+ /**
+ * Get the unique message id of the
+ * @return
+ */
+ public UUID getIndexBatchId() {
+ return indexBatchId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7d022e5..6fd73b4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -103,4 +103,12 @@ public interface IndexProcessorFig extends GuicyFig {
@Default("false")
@Key("elasticsearch.queue_impl.resolution")
boolean resolveSynchronously();
+
+ /**
+ * Get the message TTL in milliseconds
+ * @return
+ */
+ @Default("604800000")
+ @Key( "elasticsearch.message.ttl" )
+ int getIndexMessageTtl();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
new file mode 100644
index 0000000..dbd5ca3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.util;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * An utility class to serialize and de-serialized objects as json strings
+ */
+public final class ObjectJsonSerializer {
+
+
+ private final JsonFactory JSON_FACTORY = new JsonFactory();
+
+ private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+ public ObjectJsonSerializer( ) {
+ MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+ }
+
+
+ public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+
+ Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
+ final String stringValue;
+ //mark this version as empty
+
+ //Convert to internal entity map
+ try {
+ stringValue = MAPPER.writeValueAsString( toSerialize );
+ }
+ catch ( JsonProcessingException jpe ) {
+ throw new RuntimeException( "Unable to serialize entity", jpe );
+ }
+
+ return stringValue;
+ }
+
+
+ public <T extends Serializable> T fromString( final String value, final Class<T> toSerialize ) {
+
+ Preconditions.checkNotNull( value, "value must not be null" );
+
+ try {
+ return MAPPER.readValue( value, toSerialize );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to deserialize", e );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index a14437c..e83d6f8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@ -79,13 +80,16 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Inject
public IndexLocationStrategyFactory indexLocationStrategyFactory;
+ @Inject
+ public MapManagerFactory mapManagerFactory;
+
@Inject
public EntityIndexFactory entityIndexFactory;
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index d34a1a9..2863cbf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -189,7 +189,7 @@ public abstract class AsyncIndexServiceTest {
}
try {
- Thread.sleep( 100 );
+ Thread.sleep( 10000 );
}
catch ( InterruptedException e ) {
//swallow
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index c280fee..80e2d17 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -33,7 +33,14 @@ public interface MapManager {
/**
* Return the string, null if not found
*/
- public String getString( final String key );
+ String getString( final String key );
+
+ /**
+ * Read the string at a high consistency level. This should ensure data replication has happened
+ * @param key
+ * @return
+ */
+ String getStringHighConsistency(final String key);
/**
@@ -41,12 +48,12 @@ public interface MapManager {
* @param keys
* @return
*/
- public Map<String, String> getStrings(final Collection<String> keys);
+ Map<String, String> getStrings( final Collection<String> keys );
/**
* Return the string, null if not found
*/
- public void putString( final String key, final String value );
+ void putString( final String key, final String value );
/**
* The time to live (in seconds) of the string
@@ -54,33 +61,33 @@ public interface MapManager {
* @param value
* @param ttl
*/
- public void putString( final String key, final String value, final int ttl );
+ void putString( final String key, final String value, final int ttl );
/**
* Return the uuid, null if not found
*/
- public UUID getUuid( final String key );
+ UUID getUuid( final String key );
/**
* Return the uuid, null if not found
*/
- public void putUuid( final String key, final UUID putUuid );
+ void putUuid( final String key, final UUID putUuid );
/**
* Return the long, null if not found
*/
- public Long getLong( final String key );
+ Long getLong( final String key );
/**
* Return the long, null if not found
*/
- public void putLong( final String key, final Long value );
+ void putLong( final String key, final Long value );
/**
* Delete the key
*
* @param key The key used to delete the entry
*/
- public void delete( final String key );
+ void delete( final String key );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index fb2e7ff..501ade7 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -53,6 +53,12 @@ public class MapManagerImpl implements MapManager {
@Override
+ public String getStringHighConsistency( final String key ) {
+ return mapSerialization.getStringHighConsistency(scope, key);
+ }
+
+
+ @Override
public Map<String, String> getStrings( final Collection<String> keys ) {
return mapSerialization.getStrings( scope, keys );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 2e958c2..e9c21d2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -32,50 +32,59 @@ public interface MapSerialization extends Migration {
/**
* Return the string, null if not found
*/
- public String getString( final MapScope scope, final String key );
+ String getString( final MapScope scope, final String key );
+
+
+ /**
+ * Get the key from all regions with a high consistency
+ * @param scope
+ * @param key
+ * @return
+ */
+ String getStringHighConsistency( final MapScope scope, final String key );
/**
* Get strings from the map
* @param keys
* @return
*/
- public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+ Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
/**
* Return the string, null if not found
*/
- public void putString( final MapScope scope, final String key, final String value );
+ void putString( final MapScope scope, final String key, final String value );
/**
* Write the string
*/
- public void putString( final MapScope scope, final String key, final String value, final int ttl );
+ void putString( final MapScope scope, final String key, final String value, final int ttl );
/**
* Return the uuid, null if not found
*/
- public UUID getUuid( final MapScope scope, final String key );
+ UUID getUuid( final MapScope scope, final String key );
/**
* Return the uuid, null if not found
*/
- public void putUuid( final MapScope scope, final String key, final UUID putUuid );
+ void putUuid( final MapScope scope, final String key, final UUID putUuid );
/**
* Return the long, null if not found
*/
- public Long getLong( final MapScope scope, final String key );
+ Long getLong( final MapScope scope, final String key );
/**
* Return the long, null if not found
*/
- public void putLong( final MapScope scope, final String key, final Long value );
+ void putLong( final MapScope scope, final String key, final Long value );
/**
* Delete the key
*
* @param key The key used to delete the entry
*/
- public void delete( final MapScope scope, final String key );
+ void delete( final MapScope scope, final String key );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 825d636..1aa3229 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.usergrid.persistence.map.impl;
+
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -26,21 +28,21 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import com.google.common.base.Preconditions;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
import org.apache.usergrid.persistence.map.MapScope;
+import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
import com.google.inject.Inject;
@@ -53,9 +55,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
-import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.serializers.BooleanSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
@@ -65,41 +67,40 @@ public class MapSerializationImpl implements MapSerialization {
private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
- private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
- new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
+ private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
+ new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
- private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
- private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
- new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
+ private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
+ private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
+ new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
- private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
+ private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
- private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
+ private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
- /**
- * CFs where the row key contains the source node id
- */
- public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean>
- MAP_ENTRIES = new MultiTennantColumnFamily<>(
- "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
+ /**
+ * CFs where the row key contains the source node id
+ */
+ public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> MAP_ENTRIES =
+ new MultiTennantColumnFamily<>( "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
- /**
- * CFs where the row key contains the source node id
- */
- public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
- new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
+ /**
+ * CFs where the row key contains the source node id
+ */
+ public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
+ new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
/**
* Number of buckets to hash across.
*/
- private static final int[] NUM_BUCKETS = {20};
+ private static final int[] NUM_BUCKETS = { 20 };
/**
* How to funnel keys for buckets
@@ -107,7 +108,6 @@ public class MapSerializationImpl implements MapSerialization {
private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
-
@Override
public void funnel( final String key, final PrimitiveSink into ) {
into.putString( key, StringHashUtils.UTF8 );
@@ -117,8 +117,8 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Locator to get us all buckets
*/
- private static final ExpandingShardLocator<String>
- BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
+ private static final ExpandingShardLocator<String> BUCKET_LOCATOR =
+ new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
private final Keyspace keyspace;
@@ -129,13 +129,20 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public String getString( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue(scope, key); // TODO: why boolean?
- return (col !=null) ? col.getStringValue(): null;
+ Column<Boolean> col = getValue( scope, key );
+ return ( col != null ) ? col.getStringValue() : null;
}
@Override
- public Map<String, String> getStrings(final MapScope scope, final Collection<String> keys ) {
+ public String getStringHighConsistency( final MapScope scope, final String key ) {
+ Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean?
+ return ( col != null ) ? col.getStringValue() : null;
+ }
+
+
+ @Override
+ public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) {
return getValues( scope, keys, STRING_RESULTS_BUILDER );
}
@@ -144,13 +151,13 @@ public class MapSerializationImpl implements MapSerialization {
public void putString( final MapScope scope, final String key, final String value ) {
final RowOp op = new RowOp() {
@Override
- public void putValue(final ColumnListMutation<Boolean> columnListMutation ) {
+ public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value );
}
@Override
- public void putKey(final ColumnListMutation<String> keysMutation ) {
+ public void putKey( final ColumnListMutation<String> keysMutation ) {
keysMutation.putColumn( key, true );
}
};
@@ -184,10 +191,6 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Write our string index with the specified row op
- * @param scope
- * @param key
- * @param value
- * @param rowOp
*/
private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
@@ -225,10 +228,11 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Callbacks for performing row operations
*/
- private static interface RowOp{
+ private static interface RowOp {
/**
* Callback to do the row
+ *
* @param columnListMutation The column mutation
*/
void putValue( final ColumnListMutation<Boolean> columnListMutation );
@@ -236,104 +240,97 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Write the key
- * @param keysMutation
*/
void putKey( final ColumnListMutation<String> keysMutation );
-
-
}
+
@Override
public UUID getUuid( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue(scope, key);
- return (col !=null) ? col.getUUIDValue(): null;
+ Column<Boolean> col = getValue( scope, key );
+ return ( col != null ) ? col.getUUIDValue() : null;
}
@Override
public void putUuid( final MapScope scope, final String key, final UUID putUuid ) {
- Preconditions.checkNotNull(scope, "mapscope is required");
+ Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( putUuid, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
- batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, putUuid);
+ batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
- final BucketScopedRowKey< String> keyRowKey =
- BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+ final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
- batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
-
- executeBatch(batch);
+ batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
+ executeBatch( batch );
}
@Override
public Long getLong( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue(scope, key);
- return (col !=null) ? col.getLongValue(): null;
+ Column<Boolean> col = getValue( scope, key );
+ return ( col != null ) ? col.getLongValue() : null;
}
-
-
@Override
public void putLong( final MapScope scope, final String key, final Long value ) {
- Preconditions.checkNotNull(scope, "mapscope is required");
+ Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
- batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, value);
+ batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
- final BucketScopedRowKey< String> keyRowKey =
- BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+ final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
- batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
+ batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
- executeBatch(batch);
+ executeBatch( batch );
}
@Override
public void delete( final MapScope scope, final String key ) {
final MutationBatch batch = keyspace.prepareMutationBatch();
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
- batch.withRow(MAP_ENTRIES, entryRowKey).delete();
+ batch.withRow( MAP_ENTRIES, entryRowKey ).delete();
//add it to the keys, we're not sure which one it may have come from
- final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
+ final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
- final List<BucketScopedRowKey<String>>
- rowKeys = BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
+ final List<BucketScopedRowKey<String>> rowKeys =
+ BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
- for(BucketScopedRowKey<String> rowKey: rowKeys) {
+ for ( BucketScopedRowKey<String> rowKey : rowKeys ) {
batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
}
@@ -345,34 +342,53 @@ public class MapSerializationImpl implements MapSerialization {
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
final MultiTennantColumnFamilyDefinition mapEntries =
- new MultiTennantColumnFamilyDefinition( MAP_ENTRIES,
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+ new MultiTennantColumnFamilyDefinition( MAP_ENTRIES, BytesType.class.getSimpleName(),
+ BytesType.class.getSimpleName(), BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
final MultiTennantColumnFamilyDefinition mapKeys =
- new MultiTennantColumnFamilyDefinition( MAP_KEYS,
- BytesType.class.getSimpleName(),
- UTF8Type.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+ new MultiTennantColumnFamilyDefinition( MAP_KEYS, BytesType.class.getSimpleName(),
+ UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
return Arrays.asList( mapEntries, mapKeys );
}
- private Column<Boolean> getValue(MapScope scope, String key) {
+ private Column<Boolean> getValue( MapScope scope, String key ) {
+
+
+ //add it to the entry
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+
+ //now get all columns, including the "old row key value"
+ try {
+ final Column<Boolean> result =
+ keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
+
+ return result;
+ }
+ catch ( NotFoundException nfe ) {
+ //nothing to return
+ return null;
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+ }
+
+ private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) {
//add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//now get all columns, including the "old row key value"
try {
- final Column<Boolean> result = keyspace.prepareQuery( MAP_ENTRIES )
- .getKey( entryRowKey ).getColumn( true ).execute().getResult();
+ final Column<Boolean> result =
+ keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM )
+ .getKey( entryRowKey ).getColumn( true ).execute().getResult();
return result;
}
@@ -388,52 +404,45 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Get multiple values, using the string builder
- * @param scope
- * @param keys
- * @param builder
- * @param <T>
- * @return
*/
- private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+ private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) {
final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
- for(final String key: keys){
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ for ( final String key : keys ) {
+ //add it to the entry
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
rowKeys.add( entryRowKey );
-
}
+ //now get all columns, including the "old row key value"
+ try {
+ final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
+ keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute()
+ .getResult();
- //now get all columns, including the "old row key value"
- try {
- final Rows<ScopedRowKey<MapEntryKey>, Boolean>
- rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
- .execute().getResult();
-
-
- return builder.buildResults( rows );
- }
- catch ( NotFoundException nfe ) {
- //nothing to return
- return null;
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
- }
+ return builder.buildResults( rows );
+ }
+ catch ( NotFoundException nfe ) {
+ //nothing to return
+ return null;
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+ }
- private void executeBatch(MutationBatch batch) {
+ private void executeBatch( MutationBatch batch ) {
try {
batch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to connect to cassandra", e);
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
}
}
@@ -501,8 +510,7 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Create a scoped row key from the key
*/
- public static ScopedRowKey<MapEntryKey> fromKey(
- final MapScope mapScope, final String key ) {
+ public static ScopedRowKey<MapEntryKey> fromKey( final MapScope mapScope, final String key ) {
return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
}
@@ -511,32 +519,32 @@ public class MapSerializationImpl implements MapSerialization {
/**
* Build the results from the row keys
- * @param <T>
*/
private static interface ResultsBuilder<T> {
- public T buildResults(final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+ public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows );
}
- public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+ public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> {
@Override
public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
final int size = rows.size();
- final Map<String, String> results = new HashMap<>(size);
+ final Map<String, String> results = new HashMap<>( size );
- for(int i = 0; i < size; i ++){
+ for ( int i = 0; i < size; i++ ) {
final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
final String value = row.getColumns().getStringValue( true, null );
- if(value == null){
+ if ( value == null ) {
continue;
}
- results.put( row.getKey().getKey().key, value );
+ results.put( row.getKey().getKey().key, value );
}
return results;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index 4f47749..4060dac 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
@@ -42,7 +43,10 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createInd
@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
public class DeIndexOperation implements BatchOperation {
+ @JsonProperty
private String[] indexes;
+
+ @JsonProperty
private String documentId;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
index fae809f..28f2e0d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
@@ -30,6 +30,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
+import com.fasterxml.jackson.annotation.JsonProperty;
/**
@@ -37,9 +38,12 @@ import org.elasticsearch.client.Client;
*/
public class IndexOperation implements BatchOperation {
+ @JsonProperty
public String writeAlias;
+ @JsonProperty
public String documentId;
+ @JsonProperty
public Map<String, Object> data;
public IndexOperation( final String writeAlias, final ApplicationScope applicationScope, IndexEdge indexEdge,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 12df390..bcee308 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
@@ -33,9 +34,13 @@ import com.google.common.base.Optional;
* Container for index operations.
*/
public class IndexOperationMessage implements Serializable {
+ @JsonProperty
private final Set<IndexOperation> indexRequests;
+
+ @JsonProperty
private final Set<DeIndexOperation> deIndexRequests;
+ @JsonProperty
private long creationTime;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index d974529..5201279 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.queue;
import rx.Observable;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -67,12 +68,21 @@ public class DefaultQueueManager implements QueueManager {
}
}
+
@Override
- public synchronized void sendMessage(Object body) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
String uuid = UUID.randomUUID().toString();
queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
+
}
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+ sendMessage( body );
+ }
+
+
@Override
public void deleteQueue() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 027abb2..dc3d1b5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,13 @@ public interface QueueManager {
* @param body
* @throws IOException
*/
- void sendMessage(Object body)throws IOException;
+ <T extends Serializable> void sendMessage(T body)throws IOException;
+
+ /**
+ * Send a messae to the topic to be sent to other queues
+ * @param body
+ */
+ <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
/**
* purge messages
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index d476f76..59ecd24 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -18,15 +18,55 @@
package org.apache.usergrid.persistence.queue.impl;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.*;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.SubscribeRequest;
+import com.amazonaws.services.sns.model.SubscribeResult;
+import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.DeleteQueueRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
+import com.amazonaws.services.sqs.model.SendMessageResult;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,20 +76,6 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
public class SNSQueueManagerImpl implements QueueManager {
@@ -59,10 +85,10 @@ public class SNSQueueManagerImpl implements QueueManager {
private final QueueFig fig;
private final ClusterFig clusterFig;
private final CassandraFig cassandraFig;
- private final QueueFig queueFig;
private final AmazonSQSClient sqs;
private final AmazonSNSClient sns;
private final AmazonSNSAsyncClient snsAsync;
+ private final AmazonSQSAsyncClient sqsAsync;
private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -110,6 +136,7 @@ public class SNSQueueManagerImpl implements QueueManager {
});
+
@Inject
public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
CassandraFig cassandraFig, QueueFig queueFig) {
@@ -117,12 +144,21 @@ public class SNSQueueManagerImpl implements QueueManager {
this.fig = fig;
this.clusterFig = clusterFig;
this.cassandraFig = cassandraFig;
- this.queueFig = queueFig;
+
+
+ // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+ final ExecutorService executor = TaskExecutorFactory
+ .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+ TaskExecutorFactory.RejectionAction.CALLERRUNS);
+
+
+ final Region region = getRegion();
try {
- sqs = createSQSClient(getRegion());
- sns = createSNSClient(getRegion());
- snsAsync = createAsyncSNSClient(getRegion());
+ sqs = createSQSClient(region);
+ sns = createSNSClient(region);
+ snsAsync = createAsyncSNSClient(region, executor);
+ sqsAsync = createAsyncSQSClient( region, executor );
} catch (Exception e) {
throw new RuntimeException("Error setting up mapper", e);
@@ -157,7 +193,7 @@ public class SNSQueueManagerImpl implements QueueManager {
try {
SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
- sns.subscribe(primarySubscribeRequest);
+ sns.subscribe(primarySubscribeRequest);
// ensure the SNS primary topic has permission to send to the primary SQS queue
List<String> primaryTopicArnList = new ArrayList<>();
@@ -276,22 +312,35 @@ public class SNSQueueManagerImpl implements QueueManager {
*
*/
- private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
+ private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
- final Executor executor = TaskExecutorFactory
- .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
- TaskExecutorFactory.RejectionAction.CALLERRUNS);
-
- final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor);
+ final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
sns.setRegion(region);
return sns;
}
+
+ /**
+ * Create the async sqs client
+ * @param region
+ * @param executor
+ * @return
+ */
+ private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+ final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
+
+ sqs.setRegion( region );
+
+ return sqs;
+
+ }
+
/**
* The Synchronous SNS client is used for creating topics and subscribing queues.
*
@@ -369,7 +418,12 @@ public class SNSQueueManagerImpl implements QueueManager {
try {
final JsonNode bodyNode = mapper.readTree(message.getBody());
JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
- body = fromString(bodyObj.textValue(), klass);
+
+
+
+ final String bodyText = mapper.writeValueAsString( bodyObj );;
+
+ body = fromString(bodyText, klass);
} catch (Exception e) {
logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
throw new RuntimeException(e);
@@ -405,6 +459,40 @@ public class SNSQueueManagerImpl implements QueueManager {
}
}
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+ if (snsAsync == null) {
+ logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ return;
+ }
+
+ final String stringBody = toString(body);
+
+ String topicArn = getWriteTopicArn();
+
+ if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+ PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+
+ snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ @Override
+ public void onError( Exception e ) {
+ logger.error( "Error publishing message... {}", e );
+ }
+
+
+ @Override
+ public void onSuccess( PublishRequest request, PublishResult result ) {
+ if ( logger.isDebugEnabled() ) logger
+ .debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
+ request.getTopicArn() );
+ }
+ } );
+
+ }
+
+
@Override
public void sendMessages(final List bodies) throws IOException {
@@ -414,41 +502,47 @@ public class SNSQueueManagerImpl implements QueueManager {
}
for (Object body : bodies) {
- sendMessage(body);
+ sendMessage((Serializable)body);
}
}
+
@Override
- public void sendMessage(final Object body) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
- if (snsAsync == null) {
- logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ if ( snsAsync == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
return;
}
- final String stringBody = toString(body);
+ final String stringBody = toString( body );
- String topicArn = getWriteTopicArn();
+ String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Publishing Message...{} to url: {}", stringBody, url );
+ }
- PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+ SendMessageRequest request = new SendMessageRequest( url, stringBody );
- snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
- @Override
- public void onError(Exception e) {
- logger.error("Error publishing message... {}", e);
- }
+ sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
@Override
- public void onSuccess(PublishRequest request, PublishResult result) {
- if (logger.isDebugEnabled())
- logger.debug("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), request.getTopicArn());
+ public void onError( final Exception e ) {
+ logger.error( "Error sending message... {}", e );
}
- });
+
+ @Override
+ public void onSuccess( final SendMessageRequest request, final SendMessageResult sendMessageResult ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Successfully send... messageBody=[{}], url=[{}]", request.getMessageBody(),
+ request.getQueueUrl() );
+ }
+ }
+ } );
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index fa9a7ac..0c56c05 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.queue.impl;
import java.io.IOException;
+import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -243,25 +244,34 @@ public class SQSQueueManagerImpl implements QueueManager {
}
+
@Override
- public void sendMessage(final Object body) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
if (sqs == null) {
- logger.error("Sqs is null");
- return;
- }
+ logger.error("Sqs is null");
+ return;
+ }
- String url = getQueue().getUrl();
+ String url = getQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
+ if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
- final String stringBody = toString(body);
+ final String stringBody = toString(body);
- SendMessageRequest request = new SendMessageRequest(url, stringBody);
- sqs.sendMessage(request);
+ SendMessageRequest request = new SendMessageRequest(url, stringBody);
+ sqs.sendMessage(request);
+ }
+
+
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+ sendMessage( body );
}
+
@Override
public void commitMessage(final QueueMessage queueMessage) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index bca9a49..bc55ff4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.services.queues;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import org.apache.usergrid.persistence.queue.QueueManager;
@@ -65,7 +66,13 @@ public class ImportQueueManager implements QueueManager {
@Override
- public void sendMessage( final Object body ) throws IOException {
+ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
+
+ }
+
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
}
[04/12] usergrid git commit: Fixes empty payload notification issue.
Posted by mr...@apache.org.
Fixes empty payload notification issue.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0326629a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0326629a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0326629a
Branch: refs/heads/2.1-release
Commit: 0326629a24cec3bd44d91810b4b8f0516c69c9b8
Parents: 3e15585
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:53:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:53:30 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 55 ++++++++++++--------
.../asyncevents/AsyncIndexProvider.java | 4 +-
.../index/AmazonAsyncEventServiceTest.java | 2 +-
3 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f8ef5e7..6b2eb45 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -89,6 +89,21 @@ import rx.Subscription;
import rx.schedulers.Schedulers;
+/**
+ * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce. Keep the code in the handle as is
+ * 2. Consume: Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own
+ * impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
@Singleton
public class AmazonAsyncEventService implements AsyncEventService {
@@ -360,7 +375,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope );
- offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+ offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
+ new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
}
@@ -503,35 +519,29 @@ public class AmazonAsyncEventService implements AsyncEventService {
final String message = esMapPersistence.getString( messageId.toString() );
- String highConsistency = null;
+ final IndexOperationMessage indexOperationMessage;
if(message == null){
logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
- highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
-
- }
+ final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
- //read the value from the string
+ if(highConsistency == null){
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
- final IndexOperationMessage indexOperationMessage;
+ throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ }
- //our original local read has it, parse it.
- if(message != null){
- indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
- }
- //we tried to read it at a higher consistency level and it works
- else if (highConsistency != null){
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
- }
- //we couldn't find it, bail
- else{
- logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
-
- throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ } else{
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
+ //read the value from the string
+
+ Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
+ Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
//now execute it
@@ -728,9 +738,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
.map(result -> result.getQueueMessage().get())
.collect(Collectors.toList());
- //send the batch
- //TODO: should retry?
- queueIndexOperationMessage( combined );
+ //only Q it if it's empty
+ if(!combined.isEmpty()) {
+ queueIndexOperationMessage( combined );
+ }
return messagesToAck;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 1649046..2bace8d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -105,10 +105,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
case LOCAL:
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS:
- throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
+ throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 8ee47a2..625a8fd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
}
[02/12] usergrid git commit: Fixes serialization tests and verifies
full end to end functionality.
Posted by mr...@apache.org.
Fixes serialization tests and verifies full end to end functionality.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/04a3f47b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/04a3f47b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/04a3f47b
Branch: refs/heads/2.1-release
Commit: 04a3f47bd86ec0674ff487f479327e0f174f0425
Parents: 94a9078
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 12:00:56 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 12:00:56 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 70 ++-
.../asyncevents/model/AsyncEvent.java | 5 +-
.../index/IndexProcessorFig.java | 7 +-
.../util/ObjectJsonSerializer.java | 28 +-
.../persistence/queue/QueueManager.java | 2 +-
.../persistence/queue/guice/QueueModule.java | 1 -
.../queue/impl/SNSQueueManagerImpl.java | 515 ++++++++++---------
.../queue/impl/SQSQueueManagerImpl.java | 362 -------------
8 files changed, 336 insertions(+), 654 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c9f0953..d319ac8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,19 +29,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import com.google.common.base.Optional;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
-import org.apache.usergrid.exception.NotImplementedException;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
@@ -50,6 +44,8 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -61,6 +57,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
@@ -78,6 +75,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -94,8 +92,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
- private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer( );
-
// SQS maximum receive messages is 10
private static final int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
@@ -192,6 +188,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
}
+
+ private void offerTopic( final Serializable operation ) {
+ final Timer.Context timer = this.writeTimer.time();
+
+ try {
+ //signal to SQS
+ this.queue.sendMessageToTopic( operation );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to queue message", e );
+ }
+ finally {
+ timer.stop();
+ }
+ }
+
private void offerBatch(final List operations){
final Timer.Context timer = this.writeTimer.time();
@@ -226,27 +238,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- /**
- * Ack message in SQS
- */
- public void ack(final QueueMessage message) {
-
- final Timer.Context timer = this.ackTimer.time();
-
- try{
- queue.commitMessage( message );
-
- //decrement our in-flight counter
- inFlight.decrementAndGet();
-
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
- }finally {
- timer.stop();
- }
-
-
- }
/**
* Ack message in SQS
@@ -355,7 +346,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope );
- offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+ offerTopic(
+ new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
}
@@ -468,7 +460,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
*/
public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
- final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+ final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
final UUID newMessageId = UUIDGenerator.newTimeUUID();
@@ -482,12 +474,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
//send to the topic so all regions index the batch
- try {
- queue.sendMessageToTopic( elasticsearchIndexEvent );
- }
- catch ( IOException e ) {
- throw new RuntimeException( "Unable to pulish to topic", e );
- }
+
+ offerTopic( elasticsearchIndexEvent );
}
public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
@@ -505,7 +493,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
String highConsistency = null;
if(message == null){
- logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+ logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
@@ -517,11 +505,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
//our original local read has it, parse it.
if(message != null){
- indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
//we tried to read it at a higher consistency level and it works
else if (highConsistency != null){
- indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
}
//we couldn't find it, bail
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 1af54e3..7c51003 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -30,8 +30,11 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* Marker class for serialization
+ *
+ * Note that when you add a subtype, you will need to add it's serialization value below in the JsonSubTypes annotation.
+ *
+ * Each name must be unique, and must map to a subclass that is serialized
*/
-
@JsonIgnoreProperties( ignoreUnknown = true )
@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
@JsonSubTypes( {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 6fd73b4..ec9b315 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -105,10 +105,13 @@ public interface IndexProcessorFig extends GuicyFig {
boolean resolveSynchronously();
/**
- * Get the message TTL in milliseconds
+ * Get the message TTL in milliseconds. Defaults to 24 hours
+ *
+ * 24 * 60 * 60 * 1000
+ *
* @return
*/
- @Default("604800000")
+ @Default("86400000")
@Key( "elasticsearch.message.ttl" )
int getIndexMessageTtl();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
index dbd5ca3..4e5873a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@@ -33,16 +34,33 @@ import com.google.common.base.Preconditions;
public final class ObjectJsonSerializer {
- private final JsonFactory JSON_FACTORY = new JsonFactory();
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
- private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+ private static final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+ static{
+
+ /**
+ * Because of the way SNS escapes all our json, we have to tell jackson to accept it. See the documentation
+ * here for how SNS borks the message body
+ *
+ * http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+ */
+ MAPPER.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+ }
+
+ /**
+ * Singleton instance of our serializer, instantiating it and configuring the mapper is expensive.
+ */
+ public static final ObjectJsonSerializer INSTANCE = new ObjectJsonSerializer();
+
+
+ private ObjectJsonSerializer( ) {
- public ObjectJsonSerializer( ) {
- MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
}
- public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+ public <T extends Serializable> String toString( final T toSerialize ) {
Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
final String stringValue;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index dc3d1b5..34a3654 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,7 @@ public interface QueueManager {
* @param body
* @throws IOException
*/
- <T extends Serializable> void sendMessage(T body)throws IOException;
+ <T extends Serializable> void sendMessage(T body)throws IOException;
/**
* Send a messae to the topic to be sent to other queues
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index dd1fe16..caf61bf 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -26,7 +26,6 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 59ecd24..a3fa05e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -68,6 +68,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@@ -77,9 +78,10 @@ import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+
public class SNSQueueManagerImpl implements QueueManager {
- private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class );
private final QueueScope scope;
private final QueueFig fig;
@@ -91,55 +93,64 @@ public class SNSQueueManagerImpl implements QueueManager {
private final AmazonSQSAsyncClient sqsAsync;
- private final JsonFactory JSON_FACTORY = new JsonFactory();
- private final ObjectMapper mapper = new ObjectMapper(JSON_FACTORY);
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
+ private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
+
+ static {
+
+ /**
+ * Because of the way SNS escapes all our json, we have to tell jackson to accept it. See the documentation
+ * here for how SNS borks the message body
+ *
+ * http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+ */
+ mapper.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+ }
- private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, String>() {
+ private final LoadingCache<String, String> writeTopicArnMap =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, String>() {
@Override
- public String load(String queueName)
- throws Exception {
+ public String load( String queueName ) throws Exception {
- return setupTopics(queueName);
+ return setupTopics( queueName );
}
- });
+ } );
- private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, Queue>() {
+ private final LoadingCache<String, Queue> readQueueUrlMap =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() {
@Override
- public Queue load(String queueName) throws Exception {
+ public Queue load( String queueName ) throws Exception {
Queue queue = null;
try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
- queue = new Queue(result.getQueueUrl());
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
- logger.error("Queue {} does not exist, will create", queueName);
- } catch (Exception e) {
- logger.error("failed to get queue from service", e);
+ GetQueueUrlResult result = sqs.getQueueUrl( queueName );
+ queue = new Queue( result.getQueueUrl() );
+ }
+ catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+ logger.error( "Queue {} does not exist, will create", queueName );
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to get queue from service", e );
throw e;
}
- if (queue == null) {
- String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
- queue = new Queue(url);
+ if ( queue == null ) {
+ String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+ queue = new Queue( url );
}
- setupTopics(queueName);
+ setupTopics( queueName );
return queue;
}
- });
-
+ } );
@Inject
- public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
- CassandraFig cassandraFig, QueueFig queueFig) {
+ public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
+ CassandraFig cassandraFig, QueueFig queueFig ) {
this.scope = scope;
this.fig = fig;
this.clusterFig = clusterFig;
@@ -148,177 +159,179 @@ public class SNSQueueManagerImpl implements QueueManager {
// create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
final ExecutorService executor = TaskExecutorFactory
- .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
- TaskExecutorFactory.RejectionAction.CALLERRUNS);
+ .createTaskExecutor( "amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+ TaskExecutorFactory.RejectionAction.CALLERRUNS );
final Region region = getRegion();
try {
- sqs = createSQSClient(region);
- sns = createSNSClient(region);
- snsAsync = createAsyncSNSClient(region, executor);
+ sqs = createSQSClient( region );
+ sns = createSNSClient( region );
+ snsAsync = createAsyncSNSClient( region, executor );
sqsAsync = createAsyncSQSClient( region, executor );
-
- } catch (Exception e) {
- throw new RuntimeException("Error setting up mapper", e);
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Error setting up mapper", e );
}
}
- private String setupTopics(final String queueName)
- throws Exception {
- logger.info("Setting up setupTopics SNS/SQS...");
+ private String setupTopics( final String queueName ) throws Exception {
- String primaryTopicArn = AmazonNotificationUtils.getTopicArn(sns, queueName, true);
+ logger.info( "Setting up setupTopics SNS/SQS..." );
- if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn);
+ String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true );
- String queueUrl = AmazonNotificationUtils.getQueueUrlByName(sqs, queueName);
- String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(sqs, queueName);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn );
+ }
+
+ String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName );
+ String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName );
- if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn );
+ }
- if (primaryQueueArn == null) {
- if (logger.isDebugEnabled())
- logger.debug("SNS/SQS Setup: primaryQueueArn is null, creating queue...");
+ if ( primaryQueueArn == null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." );
+ }
- queueUrl = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
- primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(sqs, queueUrl);
+ queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+ primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl );
- if (logger.isDebugEnabled())
- logger.debug("SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn );
+ }
}
try {
- SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
- sns.subscribe(primarySubscribeRequest);
+ SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn );
+ sns.subscribe( primarySubscribeRequest );
// ensure the SNS primary topic has permission to send to the primary SQS queue
List<String> primaryTopicArnList = new ArrayList<>();
- primaryTopicArnList.add(primaryTopicArn);
- AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList);
- } catch (AmazonServiceException e) {
- logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
+ primaryTopicArnList.add( primaryTopicArn );
+ AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList );
+ }
+ catch ( AmazonServiceException e ) {
+ logger.error(
+ String.format( "Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn ), e );
}
- if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL) {
+ if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) {
String multiRegion = fig.getRegionList();
- if (logger.isDebugEnabled())
- logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
+ }
- String[] regionNames = multiRegion.split(",");
+ String[] regionNames = multiRegion.split( "," );
- final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
- final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
+ final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
+ final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
- arrQueueArns.put(primaryQueueArn, fig.getRegion());
- topicArns.put(primaryTopicArn, fig.getRegion());
+ arrQueueArns.put( primaryQueueArn, fig.getRegion() );
+ topicArns.put( primaryTopicArn, fig.getRegion() );
- for (String regionName : regionNames) {
+ for ( String regionName : regionNames ) {
regionName = regionName.trim();
- Regions regions = Regions.fromName(regionName);
- Region region = Region.getRegion(regions);
+ Regions regions = Regions.fromName( regionName );
+ Region region = Region.getRegion( regions );
- AmazonSQSClient sqsClient = createSQSClient(region);
- AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
+ AmazonSQSClient sqsClient = createSQSClient( region );
+ AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
// getTopicArn will create the SNS topic if it doesn't exist
- String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
- topicArns.put(topicArn, regionName);
+ String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
+ topicArns.put( topicArn, regionName );
// create the SQS queue if it doesn't exist
- String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
- if (queueArn == null) {
- queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
- queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
+ String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
+ if ( queueArn == null ) {
+ queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
+ queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
}
- arrQueueArns.put(queueArn, regionName);
+ arrQueueArns.put( queueArn, regionName );
}
- logger.debug("Creating Subscriptions...");
+ logger.debug( "Creating Subscriptions..." );
- for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
+ for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
String queueARN = queueArnEntry.getKey();
String strSqsRegion = queueArnEntry.getValue();
- Regions sqsRegions = Regions.fromName(strSqsRegion);
- Region sqsRegion = Region.getRegion(sqsRegions);
+ Regions sqsRegions = Regions.fromName( strSqsRegion );
+ Region sqsRegion = Region.getRegion( sqsRegions );
- AmazonSQSClient subscribeSqsClient = createSQSClient(sqsRegion);
+ AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion );
// ensure the URL used to subscribe is for the correct name/region
- String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName(subscribeSqsClient, queueName);
+ String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName );
// this list used later for adding permissions to queues
List<String> topicArnList = new ArrayList<>();
- for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
+ for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) {
String topicARN = topicArnEntry.getKey();
- topicArnList.add(topicARN);
+ topicArnList.add( topicARN );
String strSnsRegion = topicArnEntry.getValue();
- Regions snsRegions = Regions.fromName(strSnsRegion);
- Region snsRegion = Region.getRegion(snsRegions);
+ Regions snsRegions = Regions.fromName( strSnsRegion );
+ Region snsRegion = Region.getRegion( snsRegions );
- AmazonSNSClient subscribeSnsClient = createSNSClient(snsRegion); // do this stuff synchronously
- SubscribeRequest subscribeRequest = new SubscribeRequest(topicARN, "sqs", queueARN);
+ AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously
+ SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN );
try {
- logger.info("Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
- queueARN,
- strSqsRegion,
- topicARN,
- strSnsRegion
- );
+ logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN,
+ strSqsRegion, topicARN, strSnsRegion );
- SubscribeResult subscribeResult = subscribeSnsClient.subscribe(subscribeRequest);
+ SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest );
String subscriptionARN = subscribeResult.getSubscriptionArn();
- if(logger.isDebugEnabled()){
- logger.debug("Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", queueARN, topicARN, subscriptionARN);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug(
+ "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]",
+ queueARN, topicARN, subscriptionARN );
}
-
-
- } catch (Exception e) {
- logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
- queueARN,
- strSqsRegion,
- topicARN,
- strSnsRegion), e);
-
-
+ }
+ catch ( Exception e ) {
+ logger.error( String
+ .format( "ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
+ queueARN, strSqsRegion, topicARN, strSnsRegion ), e );
}
}
- logger.info("Adding permission to receive messages...");
+ logger.info( "Adding permission to receive messages..." );
// add permission to each queue, providing a list of topics that it's subscribed to
- AmazonNotificationUtils.setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList);
-
+ AmazonNotificationUtils
+ .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList );
}
}
return primaryTopicArn;
}
+
/**
* The Asynchronous SNS client is used for publishing events to SNS.
- *
*/
- private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
+ private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
+ final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient( ugProvider.getCredentials(), executor );
- sns.setRegion(region);
+ sns.setRegion( region );
return sns;
}
@@ -326,11 +339,8 @@ public class SNSQueueManagerImpl implements QueueManager {
/**
* Create the async sqs client
- * @param region
- * @param executor
- * @return
*/
- private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+ private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
@@ -338,173 +348,209 @@ public class SNSQueueManagerImpl implements QueueManager {
sqs.setRegion( region );
return sqs;
-
}
+
/**
* The Synchronous SNS client is used for creating topics and subscribing queues.
- *
*/
- private AmazonSNSClient createSNSClient(final Region region) {
+ private AmazonSNSClient createSNSClient( final Region region ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+ final AmazonSNSClient sns = new AmazonSNSClient( ugProvider.getCredentials() );
- sns.setRegion(region);
+ sns.setRegion( region );
return sns;
}
private String getName() {
- String name = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
+ String name =
+ clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+ + scope.getRegionImplementation();
name = name.toLowerCase(); //user lower case values
- Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
+ Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
return name;
}
+
public Queue getReadQueue() {
String queueName = getName();
try {
- return readQueueUrlMap.get(queueName);
- } catch (ExecutionException ee) {
- throw new RuntimeException(ee);
+ return readQueueUrlMap.get( queueName );
+ }
+ catch ( ExecutionException ee ) {
+ throw new RuntimeException( ee );
}
}
+
public String getWriteTopicArn() {
try {
- return writeTopicArnMap.get(getName());
-
- } catch (ExecutionException ee) {
- throw new RuntimeException(ee);
+ return writeTopicArnMap.get( getName() );
+ }
+ catch ( ExecutionException ee ) {
+ throw new RuntimeException( ee );
}
}
+
@Override
- public rx.Observable<QueueMessage> getMessages(final int limit,
- final int transactionTimeout,
- final int waitTime,
- final Class klass) {
+ public rx.Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+ final Class klass ) {
- if (sqs == null) {
- logger.error("SQS is null - was not initialized properly");
+ if ( sqs == null ) {
+ logger.error( "SQS is null - was not initialized properly" );
return rx.Observable.empty();
}
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Getting up to {} messages from {}", limit, url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Getting up to {} messages from {}", limit, url );
+ }
- ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
- receiveMessageRequest.setMaxNumberOfMessages(limit);
- receiveMessageRequest.setVisibilityTimeout(Math.max(1, transactionTimeout / 1000));
- receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
+ ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
+ receiveMessageRequest.setMaxNumberOfMessages( limit );
+ receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) );
+ receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 );
try {
- ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+ ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest );
List<Message> messages = result.getMessages();
- if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Received {} messages from {}", messages.size(), url );
+ }
+
+ List<QueueMessage> queueMessages = new ArrayList<>( messages.size() );
- List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+ for ( Message message : messages ) {
- for (Message message : messages) {
- Object body;
+ Object payload;
final String originalBody = message.getBody();
try {
- final JsonNode bodyNode = mapper.readTree(message.getBody());
- JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
+ final JsonNode bodyNode = mapper.readTree( message.getBody() );
+ /**
+ * When a message originates from SNS it has a "Message" we have to extract
+ * it and then process it seperately
+ */
- final String bodyText = mapper.writeValueAsString( bodyObj );;
+ if ( bodyNode.has( "Message" ) ) {
+ final String snsNode = bodyNode.get( "Message" ).asText();
- body = fromString(bodyText, klass);
- } catch (Exception e) {
- logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
- throw new RuntimeException(e);
+ payload = deSerializeSQSMessage( snsNode, klass );
+ }
+ else {
+ payload = deSerializeSQSMessage( originalBody, klass );
+ }
+ }
+ catch ( Exception e ) {
+ logger.error( String.format( "failed to deserialize message: %s", message.getBody() ), e );
+ throw new RuntimeException( e );
}
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
- queueMessage.setStringBody(originalBody);
- queueMessages.add(queueMessage);
+ QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
+ message.getAttributes().get( "type" ) );
+ queueMessage.setStringBody( originalBody );
+ queueMessages.add( queueMessage );
}
- return rx.Observable.from(queueMessages);
-
- } catch (com.amazonaws.services.sqs.model.QueueDoesNotExistException dne) {
- logger.error(String.format("Queue does not exist! [%s]", url), dne);
- } catch (Exception e) {
- logger.error(String.format("Programming error getting messages from queue=[%s] exist!", url), e);
+ return rx.Observable.from( queueMessages );
+ }
+ catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) {
+ logger.error( String.format( "Queue does not exist! [%s]", url ), dne );
+ }
+ catch ( Exception e ) {
+ logger.error( String.format( "Programming error getting messages from queue=[%s] exist!", url ), e );
}
- return rx.Observable.from(new ArrayList<>(0));
+ return rx.Observable.from( new ArrayList<>( 0 ) );
}
+
+ /**
+ * Take a string, possibly escaped via SNS, and run it through our mapper to create an object)
+ */
+ private Object deSerializeSQSMessage( final String message, final Class type ) {
+ try {
+ final Object o = mapper.readValue( message, type );
+ return o;
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to deserialize message " + message + " for class " + type, e );
+ }
+ }
+
+
@Override
public long getQueueDepth() {
String key = "ApproximateNumberOfMessages";
try {
- GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key));
- String depthString = result.getAttributes().get(key);
- return depthString != null ? Long.parseLong(depthString) : 0;
- }catch (Exception e){
- logger.error("Exception getting queue depth",e);
+ GetQueueAttributesResult result =
+ sqs.getQueueAttributes( getReadQueue().getUrl(), Collections.singletonList( key ) );
+ String depthString = result.getAttributes().get( key );
+ return depthString != null ? Long.parseLong( depthString ) : 0;
+ }
+ catch ( Exception e ) {
+ logger.error( "Exception getting queue depth", e );
return -1;
-
}
}
@Override
public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
- if (snsAsync == null) {
- logger.error("SNS client is null, perhaps it failed to initialize successfully");
- return;
- }
-
- final String stringBody = toString(body);
+ if ( snsAsync == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+ return;
+ }
- String topicArn = getWriteTopicArn();
+ final String stringBody = toString( body );
- if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+ String topicArn = getWriteTopicArn();
- PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Publishing Message...{} to arn: {}", stringBody, topicArn );
+ }
- snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
- @Override
- public void onError( Exception e ) {
- logger.error( "Error publishing message... {}", e );
- }
+ PublishRequest publishRequest = new PublishRequest( topicArn, stringBody );
+ snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ @Override
+ public void onError( Exception e ) {
+ logger.error( "Error publishing message... {}", e );
+ }
- @Override
- public void onSuccess( PublishRequest request, PublishResult result ) {
- if ( logger.isDebugEnabled() ) logger
- .debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
- request.getTopicArn() );
- }
- } );
+ @Override
+ public void onSuccess( PublishRequest request, PublishResult result ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
+ request.getTopicArn() );
+ }
+ }
+ } );
}
@Override
- public void sendMessages(final List bodies) throws IOException {
+ public void sendMessages( final List bodies ) throws IOException {
- if (snsAsync == null) {
- logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ if ( snsAsync == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
return;
}
- for (Object body : bodies) {
- sendMessage((Serializable)body);
+ for ( Object body : bodies ) {
+ sendMessage( ( Serializable ) body );
}
-
}
@@ -545,94 +591,81 @@ public class SNSQueueManagerImpl implements QueueManager {
} );
}
+
@Override
public void deleteQueue() {
- logger.warn("Deleting queue: "+getReadQueue().getUrl());
- sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
- logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
- sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
-
+ logger.warn( "Deleting queue: " + getReadQueue().getUrl() );
+ sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() ) );
+ logger.warn( "Deleting queue: " + getReadQueue().getUrl() + "_dead" );
+ sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() + "_dead" ) );
}
@Override
- public void commitMessage(final QueueMessage queueMessage) {
+ public void commitMessage( final QueueMessage queueMessage ) {
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled())
- logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url );
+ }
- sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(url)
- .withReceiptHandle(queueMessage.getHandle()));
+ sqs.deleteMessage(
+ new DeleteMessageRequest().withQueueUrl( url ).withReceiptHandle( queueMessage.getHandle() ) );
}
@Override
- public void commitMessages(final List<QueueMessage> queueMessages) {
+ public void commitMessages( final List<QueueMessage> queueMessages ) {
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Commit messages {} to queue {}", queueMessages.size(), url );
+ }
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
- for (QueueMessage message : queueMessages) {
- entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
+ for ( QueueMessage message : queueMessages ) {
+ entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) );
}
- DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
- DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+ DeleteMessageBatchRequest request = new DeleteMessageBatchRequest( url, entries );
+ DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );
boolean successful = result.getFailed().size() <= 0;
- if (!successful) {
- for (BatchResultErrorEntry failed : result.getFailed()) {
- logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
+ if ( !successful ) {
+ for ( BatchResultErrorEntry failed : result.getFailed() ) {
+ logger.error( "Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId() );
}
}
}
/**
- * Read the object from Base64 string.
- */
-
- private Object fromString(final String s, final Class klass)
- throws IOException, ClassNotFoundException {
-
- Object o = mapper.readValue(s, klass);
- return o;
- }
-
- /**
* Write the object to a Base64 string.
*/
- private String toString(final Object o) throws IOException {
- return mapper.writeValueAsString(o);
+ private String toString( final Object o ) throws IOException {
+ return mapper.writeValueAsString( o );
}
/**
* Get the region
- *
- * @return
*/
private Region getRegion() {
- Regions regions = Regions.fromName(fig.getRegion());
- return Region.getRegion(regions);
+ Regions regions = Regions.fromName( fig.getRegion() );
+ return Region.getRegion( regions );
}
/**
* Create the SQS client for the specified settings
*/
- private AmazonSQSClient createSQSClient(final Region region) {
+ private AmazonSQSClient createSQSClient( final Region region ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
+ final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
- sqs.setRegion(region);
+ sqs.setRegion( region );
return sqs;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
deleted file mode 100644
index 0c56c05..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue.impl;
-
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-
-import com.amazonaws.services.sqs.model.*;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-public class SQSQueueManagerImpl implements QueueManager {
- private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
-
-
- private final QueueScope scope;
- private ObjectMapper mapper;
- protected final QueueFig fig;
- private final ClusterFig clusterFig;
- protected final AmazonSQSClient sqs;
-
- private static SmileFactory smileFactory = new SmileFactory();
-
- private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, Queue>() {
- @Override
- public Queue load(String queueName) throws Exception {
-
- //the amazon client is not thread safe, we need to create one per queue
- Queue queue = null;
-
- try {
-
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
- queue = new Queue(result.getQueueUrl());
-
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
- //no op, swallow
- logger.error("Queue {} does not exist, creating", queueName);
-
- } catch (Exception e) {
- logger.error("failed to get queue from service", e);
- throw e;
- }
-
- if (queue == null) {
-
- final String deadletterQueueName = String.format("%s_dead", queueName);
- final Map<String, String> deadLetterAttributes = new HashMap<>(2);
-
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
- CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
- .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
-
- final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
- logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
-
- final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName);
-
- String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
- " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
-
- final Map<String, String> queueAttributes = new HashMap<>(2);
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
- deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
-
- CreateQueueRequest createQueueRequest = new CreateQueueRequest().
- withQueueName(queueName)
- .withAttributes(queueAttributes);
-
- CreateQueueResult result = sqs.createQueue(createQueueRequest);
-
- String url = result.getQueueUrl();
- queue = new Queue(url);
-
- logger.info("Created queue with url {}", url);
- }
-
- return queue;
- }
- });
-
-
- @Inject
- public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig, final ClusterFig clusterFig) {
-
- this.scope = scope;
- this.fig = fig;
- this.clusterFig = clusterFig;
- try {
-
- smileFactory.delegateToTextual(true);
- mapper = new ObjectMapper(smileFactory);
- //pretty print, disabling for speed
-// mapper.enable(SerializationFeature.INDENT_OUTPUT);
- mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
- sqs = createClient();
-
- } catch (Exception e) {
- throw new RuntimeException("Error setting up mapper", e);
- }
- }
-
-
- protected String getName() {
-
- String name = clusterFig.getClusterName() + "_" + scope.getName();
-
- Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
-
- return name;
- }
-
- public Queue getQueue() {
-
- try {
- Queue queue = urlMap.get(getName());
- return queue;
- } catch (ExecutionException ee) {
- throw new RuntimeException(ee);
- }
- }
-
- @Override
- public rx.Observable<QueueMessage> getMessages(final int limit,
- final int transactionTimeout,
- final int waitTime,
- final Class klass) {
-
- if (sqs == null) {
- logger.error("Sqs is null");
- return rx.Observable.empty();
- }
-
- String url = getQueue().getUrl();
-
- if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url);
-
- ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
- receiveMessageRequest.setMaxNumberOfMessages(limit);
- receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
- receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
- ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
- List<Message> messages = result.getMessages();
-
- if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
-
- List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
-
- for (Message message : messages) {
- Object body;
-
- try {
- body = fromString(message.getBody(), klass);
- } catch (Exception e) {
- logger.error("failed to deserialize message", e);
- throw new RuntimeException(e);
- }
-
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
- queueMessage.setStringBody(message.getBody());
- queueMessages.add(queueMessage);
- }
-
- return rx.Observable.from(queueMessages);
- }
-
- @Override
- public long getQueueDepth() {
- String key = "ApproximateNumberOfMessages";
- try {
- GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key));
- String depthString = result.getAttributes().get(key);
- return depthString != null ? Long.parseLong(depthString) : 0;
- }catch (Exception e){
- logger.error("Exception getting queue depth",e);
- return -1;
-
- }
- }
- @Override
- public void sendMessages(final List bodies) throws IOException {
-
- if (sqs == null) {
- logger.error("Sqs is null");
- return;
- }
- String url = getQueue().getUrl();
-
- if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url);
-
- SendMessageBatchRequest request = new SendMessageBatchRequest(url);
- List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-
- for (Object body : bodies) {
- SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
- entry.setId(UUID.randomUUID().toString());
- entry.setMessageBody(toString(body));
- entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype"));
- entries.add(entry);
- }
-
- request.setEntries(entries);
- sqs.sendMessageBatch(request);
-
- }
-
-
- @Override
- public <T extends Serializable> void sendMessage( final T body ) throws IOException {
-
- if (sqs == null) {
- logger.error("Sqs is null");
- return;
- }
-
- String url = getQueue().getUrl();
-
- if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
-
- final String stringBody = toString(body);
-
- SendMessageRequest request = new SendMessageRequest(url, stringBody);
- sqs.sendMessage(request);
- }
-
-
-
- @Override
- public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
- sendMessage( body );
- }
-
-
-
- @Override
- public void commitMessage(final QueueMessage queueMessage) {
-
- String url = getQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
-
- sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(url)
- .withReceiptHandle(queueMessage.getHandle()));
- }
-
-
- @Override
- public void commitMessages(final List<QueueMessage> queueMessages) {
-
- String url = getQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
-
- List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
-
- for (QueueMessage message : queueMessages) {
- entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
- }
-
- DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
- DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
-
- boolean successful = result.getFailed().size() <= 0;
-
- if (!successful) {
-
- for (BatchResultErrorEntry failed : result.getFailed()) {
- logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
- }
- }
- }
-
-
- /**
- * Read the object from Base64 string.
- */
- private Object fromString(final String s,
- final Class klass) throws IOException, ClassNotFoundException {
- Object o = mapper.readValue(s, klass);
- return o;
- }
-
- /**
- * Write the object to a Base64 string.
- */
- protected String toString(final Object o) throws IOException {
- return mapper.writeValueAsString(o);
- }
-
-
- /**
- * Get the region
- *
- * @return
- */
- protected Region getRegion() {
- Regions regions = Regions.fromName(fig.getRegion());
- Region region = Region.getRegion(regions);
- return region;
- }
-
- @Override
- public void deleteQueue() {
- logger.warn("Deleting queue: "+getQueue().getUrl());
- sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
- }
-
-
-
- /**
- * Create the SQS client for the specified settings
- */
- private AmazonSQSClient createClient() {
- final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
- final Region region = getRegion();
- sqs.setRegion(region);
-
- return sqs;
- }
-
-
-}
[10/12] usergrid git commit: Add null check back for batch sending of
SQS w/proper SQS client. Less logging would occur in the event of batch send
when the client is not initialized.
Posted by mr...@apache.org.
Add null check back for batch sending of SQS w/proper SQS client. Less logging would occur in the event of batch send when the client is not initialized.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fbb6c823
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fbb6c823
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fbb6c823
Branch: refs/heads/2.1-release
Commit: fbb6c823b7e2a1f5f55ab044942b38d1157970c0
Parents: d8e6572
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 21:28:46 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 21:28:46 2015 -0700
----------------------------------------------------------------------
.../usergrid/persistence/queue/impl/SNSQueueManagerImpl.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbb6c823/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 3c18992..1bb00dc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -543,6 +543,11 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public void sendMessages( final List bodies ) throws IOException {
+ if ( sqsAsync == null ) {
+ logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
+ return;
+ }
+
for ( Object body : bodies ) {
sendMessage( ( Serializable ) body );
}
[05/12] usergrid git commit: Makes consistency configurable
Posted by mr...@apache.org.
Makes consistency configurable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/19d30eaf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/19d30eaf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/19d30eaf
Branch: refs/heads/2.1-release
Commit: 19d30eafc77095ee74ae126f9d0a849e997b6ad7
Parents: 0326629
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:59:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:59:08 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 4 +---
.../map/impl/MapSerializationImpl.java | 21 ++++++++++----------
2 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/19d30eaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6b2eb45..67d0dab 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,9 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import com.google.common.base.Optional;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
-import org.apache.usergrid.persistence.queue.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +65,7 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueMessage;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/19d30eaf/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 1aa3229..ffe10c9 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
@@ -105,15 +107,9 @@ public class MapSerializationImpl implements MapSerialization {
/**
* How to funnel keys for buckets
*/
- private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
+ private static final Funnel<String> MAP_KEY_FUNNEL = ( key, into ) -> into.putString( key, StringHashUtils.UTF8 );
- @Override
- public void funnel( final String key, final PrimitiveSink into ) {
- into.putString( key, StringHashUtils.UTF8 );
- }
- };
-
/**
* Locator to get us all buckets
*/
@@ -121,10 +117,14 @@ public class MapSerializationImpl implements MapSerialization {
new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
private final Keyspace keyspace;
+ private final CassandraConfig cassandraConfig;
@Inject
- public MapSerializationImpl( final Keyspace keyspace ) {this.keyspace = keyspace;}
+ public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig ) {
+ this.keyspace = keyspace;
+ this.cassandraConfig = cassandraConfig;
+ }
@Override
@@ -387,7 +387,7 @@ public class MapSerializationImpl implements MapSerialization {
//now get all columns, including the "old row key value"
try {
final Column<Boolean> result =
- keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM )
+ keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getConsistentReadCL() )
.getKey( entryRowKey ).getColumn( true ).execute().getResult();
return result;
@@ -421,7 +421,8 @@ public class MapSerializationImpl implements MapSerialization {
//now get all columns, including the "old row key value"
try {
final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
- keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute()
+ keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKeySlice(
+ rowKeys ).withColumnSlice( true ).execute()
.getResult();
[08/12] usergrid git commit: Add sourceRegion to
ElasticsearchIndexEvent, fix logging statements,
update sqs/sns client null checks.
Posted by mr...@apache.org.
Add sourceRegion to ElasticsearchIndexEvent, fix logging statements, update sqs/sns client null checks.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3a7e60b3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3a7e60b3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3a7e60b3
Branch: refs/heads/2.1-release
Commit: 3a7e60b3131e207890354ca5fa84258795296372
Parents: 19d30ea
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 17:07:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 17:07:18 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 9 ++++++---
.../asyncevents/model/ElasticsearchIndexEvent.java | 3 ++-
.../persistence/queue/impl/SNSQueueManagerImpl.java | 9 ++-------
3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 67d0dab..2b583b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -498,7 +498,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
//now queue up the index message
- final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+ final ElasticsearchIndexEvent elasticsearchIndexEvent =
+ new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
//send to the topic so all regions index the batch
@@ -520,12 +521,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexOperationMessage indexOperationMessage;
if(message == null){
- logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
+ logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level",
+ messageId);
final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
if(highConsistency == null){
- logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level",
+ messageId);
throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
index 207b15e..049c3a5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -35,7 +35,8 @@ public final class ElasticsearchIndexEvent extends AsyncEvent {
public ElasticsearchIndexEvent() {
}
- public ElasticsearchIndexEvent( UUID indexBatchId ) {
+ public ElasticsearchIndexEvent(String sourceRegion, UUID indexBatchId) {
+ super(sourceRegion);
this.indexBatchId = indexBatchId;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 58b2a4d..3c18992 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -543,11 +543,6 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public void sendMessages( final List bodies ) throws IOException {
- if ( snsAsync == null ) {
- logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
- return;
- }
-
for ( Object body : bodies ) {
sendMessage( ( Serializable ) body );
}
@@ -557,8 +552,8 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public <T extends Serializable> void sendMessage( final T body ) throws IOException {
- if ( snsAsync == null ) {
- logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+ if ( sqsAsync == null ) {
+ logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
return;
}
[07/12] usergrid git commit: Updates the defaults to be more sensible
in a multi-region environment
Posted by mr...@apache.org.
Updates the defaults to be more sensible in a multi-region environment
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ec0f588
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ec0f588
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ec0f588
Branch: refs/heads/2.1-release
Commit: 3ec0f5886b82737d4a7ed64fae01afbdb6707763
Parents: 4013f17
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 17:44:39 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 17:44:39 2015 -0600
----------------------------------------------------------------------
.../corepersistence/index/PublishRxTest.java | 95 ----------------
.../usergrid/corepersistence/index/RxTest.java | 108 +++++++++++++++++++
.../persistence/core/astyanax/CassandraFig.java | 6 +-
3 files changed, 111 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
deleted file mode 100644
index 973a42d..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import rx.Observable;
-import rx.Subscription;
-import rx.observables.ConnectableObservable;
-import rx.schedulers.Schedulers;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Test to test some assumptions about RX behaviors
- */
-public class PublishRxTest {
-
- @Test
- public void testPublish() throws InterruptedException {
-
- final int count = 10;
-
- final CountDownLatch latch = new CountDownLatch( count );
-
- final Subscription connectedObservable =
- Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() )
- .subscribe();
-
-
- final boolean completed = latch.await( 5, TimeUnit.SECONDS );
-
- assertTrue( "publish1 behaves as expected", completed );
-
- final boolean completedSubscription = connectedObservable.isUnsubscribed();
-
- assertTrue( "Subscription complete", completedSubscription );
- }
-
-
- @Test
- @Ignore("This seems like it should work, yet blocks forever")
- public void testConnectableObserver() throws InterruptedException {
-
- final int count = 10;
-
- final CountDownLatch latch = new CountDownLatch( count );
-
- final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
-
-
- //connect to our latch, which should run on it's own subscription
- //start our latch running
- connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
-
-
- final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
-
- //start the sequence
- connectedObservable.connect();
-
-
- final boolean completed = latch.await( 5, TimeUnit.SECONDS );
-
- assertTrue( "publish1 behaves as expected", completed );
-
- final int returnedCount = countObservable.toBlocking().last();
-
- assertEquals( "Counts the same", count, returnedCount );
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
new file mode 100644
index 0000000..1d940d0
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Subscription;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test to test some assumptions about RX behaviors
+ */
+public class RxTest {
+
+ @Test
+ public void testPublish() throws InterruptedException {
+
+ final int count = 10;
+
+ final CountDownLatch latch = new CountDownLatch( count );
+
+ final Subscription connectedObservable =
+ Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() )
+ .subscribe();
+
+
+ final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+ assertTrue( "publish1 behaves as expected", completed );
+
+ final boolean completedSubscription = connectedObservable.isUnsubscribed();
+
+ assertTrue( "Subscription complete", completedSubscription );
+ }
+
+
+ @Test
+ @Ignore("This seems like it should work, yet blocks forever")
+ public void testConnectableObserver() throws InterruptedException {
+
+ final int count = 10;
+
+ final CountDownLatch latch = new CountDownLatch( count );
+
+ final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
+
+
+ //connect to our latch, which should run on it's own subscription
+ //start our latch running
+ connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
+
+
+ final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
+
+ //start the sequence
+ connectedObservable.connect();
+
+
+ final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+ assertTrue( "publish1 behaves as expected", completed );
+
+ final int returnedCount = countObservable.toBlocking().last();
+
+ assertEquals( "Counts the same", count, returnedCount );
+ }
+
+
+ /**
+ * Tests that reduce emits
+ */
+ @Test
+ public void testReduceEmpty(){
+ final int result = Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
+
+ assertEquals(0, result);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 79c198f..e98e0fd 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -79,15 +79,15 @@ public interface CassandraFig extends GuicyFig {
String getDiscoveryType();
- @Default("CL_LOCAL_ONE")
+ @Default("CL_LOCAL_QUORUM")
@Key(READ_CL)
String getReadCL();
- @Default("CL_LOCAL_QUORUM")
+ @Default("CL_QUORUM")
@Key(READ_CONSISTENT_CL)
String getConsistentReadCL();
- @Default("CL_QUORUM")
+ @Default("CL_LOCAL_QUORUM")
@Key(WRITE_CL)
String getWriteCL();
[11/12] usergrid git commit: Fixes comments and refactors map to be a
cleaner read pattern
Posted by mr...@apache.org.
Fixes comments and refactors map to be a cleaner read pattern
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e50835f1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e50835f1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e50835f1
Branch: refs/heads/2.1-release
Commit: e50835f1016a9988513a00b64337222957fa7747
Parents: fbb6c82
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 20 09:34:43 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 20 09:34:43 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 2 +-
.../core/astyanax/CassandraConfig.java | 8 ++---
.../core/astyanax/CassandraConfigImpl.java | 1 +
.../map/impl/MapSerializationImpl.java | 37 ++++----------------
.../queue/impl/SNSQueueManagerImpl.java | 4 +--
5 files changed, 14 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 38c2966..6f779b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -521,7 +521,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexOperationMessage indexOperationMessage;
if(message == null){
- logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level",
+ logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level",
messageId);
final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
index 817aee2..dba3646 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
@@ -34,25 +34,25 @@ public interface CassandraConfig {
* Get the currently configured ReadCL
* @return
*/
- public ConsistencyLevel getReadCL();
+ ConsistencyLevel getReadCL();
/**
* Get the currently configured ReadCL that is more consitent than getReadCL
* @return
*/
- public ConsistencyLevel getConsistentReadCL();
+ ConsistencyLevel getConsistentReadCL();
/**
* Get the currently configured write CL
* @return
*/
- public ConsistencyLevel getWriteCL();
+ ConsistencyLevel getWriteCL();
/**
* Return the number of shards that has been set in the property file
* @return
*/
- public int[] getShardSettings();
+ int[] getShardSettings();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
index 17b91c6..7373322 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
@@ -83,6 +83,7 @@ public class CassandraConfigImpl implements CassandraConfig {
public ConsistencyLevel getConsistentReadCL() {
return consistentCl;
}
+
@Override
public ConsistencyLevel getWriteCL() {
return writeCl;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index ffe10c9..ceeb3ad 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -129,14 +129,14 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public String getString( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key );
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getStringValue() : null;
}
@Override
public String getStringHighConsistency( final MapScope scope, final String key ) {
- Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean?
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getConsistentReadCL() ); // TODO: why boolean?
return ( col != null ) ? col.getStringValue() : null;
}
@@ -248,7 +248,7 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public UUID getUuid( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key );
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getUUIDValue() : null;
}
@@ -283,7 +283,7 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public Long getLong( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key );
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getLongValue() : null;
}
@@ -355,31 +355,7 @@ public class MapSerializationImpl implements MapSerialization {
}
- private Column<Boolean> getValue( MapScope scope, String key ) {
-
-
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
-
- //now get all columns, including the "old row key value"
- try {
- final Column<Boolean> result =
- keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
-
- return result;
- }
- catch ( NotFoundException nfe ) {
- //nothing to return
- return null;
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
- }
-
-
- private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) {
-
+ private Column<Boolean> getValue( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
@@ -387,8 +363,7 @@ public class MapSerializationImpl implements MapSerialization {
//now get all columns, including the "old row key value"
try {
final Column<Boolean> result =
- keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getConsistentReadCL() )
- .getKey( entryRowKey ).getColumn( true ).execute().getResult();
+ keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( consistencyLevel ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
return result;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 1bb00dc..bc5f2f1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -438,7 +438,7 @@ public class SNSQueueManagerImpl implements QueueManager {
/**
* When a message originates from SNS it has a "Message" we have to extract
- * it and then process it seperately
+ * it and then process it separately
*/
@@ -546,7 +546,7 @@ public class SNSQueueManagerImpl implements QueueManager {
if ( sqsAsync == null ) {
logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
return;
- }
+ }
for ( Object body : bodies ) {
sendMessage( ( Serializable ) body );
[12/12] usergrid git commit: Fixes incorrect units on timeout from
millis to seconds.
Posted by mr...@apache.org.
Fixes incorrect units on timeout from millis to seconds.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1fe1d1a3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1fe1d1a3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1fe1d1a3
Branch: refs/heads/2.1-release
Commit: 1fe1d1a34e806ebfc53d0c6cb729e7e4aab804a1
Parents: e50835f
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 20 11:37:03 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 20 11:44:35 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1fe1d1a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6f779b5..7034a67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -491,8 +492,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
final UUID newMessageId = UUIDGenerator.newTimeUUID();
+ final int expirationTimeInSeconds =
+ ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
//write to the map in ES
- esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
[03/12] usergrid git commit: Merge branch 'refs/heads/2.1-release'
into USERGRID-1048
Posted by mr...@apache.org.
Merge branch 'refs/heads/2.1-release' into USERGRID-1048
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3e155852
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3e155852
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3e155852
Branch: refs/heads/2.1-release
Commit: 3e1558524728c96834cfd66d9e53e3f1b6a7d3d6
Parents: 04a3f47 a09485a
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:30:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:30:04 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 149 ++++++++++---------
.../asyncevents/AsyncIndexProvider.java | 26 ++--
.../asyncevents/model/AsyncEvent.java | 14 +-
.../asyncevents/model/EdgeDeleteEvent.java | 6 +-
.../asyncevents/model/EdgeIndexEvent.java | 9 +-
.../asyncevents/model/EntityDeleteEvent.java | 8 +-
.../asyncevents/model/EntityIndexEvent.java | 6 +-
.../model/InitializeApplicationIndexEvent.java | 4 +-
.../index/AmazonAsyncEventServiceTest.java | 6 +-
.../cache/CachedEntityCollectionManager.java | 147 ------------------
.../EntityCollectionManagerFactoryImpl.java | 6 -
.../usergrid/persistence/queue/QueueFig.java | 2 +-
.../queue/impl/SNSQueueManagerImpl.java | 8 +-
13 files changed, 135 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d319ac8,c198674..f8ef5e7
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@@ -97,7 -84,9 +100,8 @@@ public class AmazonAsyncEventService im
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
private final QueueManager queue;
- private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
+ private final QueueFig queueFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@@ -125,33 -113,28 +129,35 @@@
@Inject
- public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
- final IndexProcessorFig indexProcessorFig,
- final IndexProducer indexProducer,
- final MetricsFactory metricsFactory,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory,
- final EventBuilder eventBuilder,
- final RxTaskScheduler rxTaskScheduler,
- QueueFig queueFig) {
+ public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
+ final IndexProcessorFig indexProcessorFig,
+ final IndexProducer indexProducer,
+ final MetricsFactory metricsFactory,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory,
+ final EventBuilder eventBuilder,
+ final MapManagerFactory mapManagerFactory,
++ final QueueFig queueFig,
+ final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.eventBuilder = eventBuilder;
+
+ final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");
+
+ this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
this.rxTaskScheduler = rxTaskScheduler;
- this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+ QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
this.queue = queueManagerFactory.getQueueManager(queueScope);
+
this.indexProcessorFig = indexProcessorFig;
+ this.queueFig = queueFig;
this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
@@@ -271,70 -259,64 +277,78 @@@
logger.debug("callEventHandlers with {} message", messages.size());
}
-- Stream<IndexEventResult> indexEventResults = messages.stream()
-- .map(message -> {
-- AsyncEvent event = null;
-- try {
-- event = (AsyncEvent) message.getBody();
-- } catch (ClassCastException cce) {
-- logger.error("Failed to deserialize message body", cce);
++ Stream<IndexEventResult> indexEventResults = messages.stream().map( message -> {
++ AsyncEvent event = null;
++ try {
++ event = ( AsyncEvent ) message.getBody();
++ }
++ catch ( ClassCastException cce ) {
++ logger.error( "Failed to deserialize message body", cce );
++ }
++
++ if ( event == null ) {
++ logger.error( "AsyncEvent type or event is null!" );
++ return new IndexEventResult( Optional.fromNullable( message ), Optional.<IndexOperationMessage>absent(),
++ System.currentTimeMillis() );
++ }
++
++ final AsyncEvent thisEvent = event;
++ if ( logger.isDebugEnabled() ) {
++ logger.debug( "Processing {} event", event );
++ }
++
++ try {
++ //check for empty sets if this is true
++ boolean validateEmptySets = true;
++ Observable<IndexOperationMessage> indexoperationObservable;
++ //merge each operation to a master observable;
++ if ( event instanceof EdgeDeleteEvent ) {
++ indexoperationObservable = handleEdgeDelete( message );
++ }
++ else if ( event instanceof EdgeIndexEvent ) {
++ indexoperationObservable = handleEdgeIndex( message );
++ }
++ else if ( event instanceof EntityDeleteEvent ) {
++ indexoperationObservable = handleEntityDelete( message );
++ }
++ else if ( event instanceof EntityIndexEvent ) {
++ indexoperationObservable = handleEntityIndexUpdate( message );
++ }
++ else if ( event instanceof InitializeApplicationIndexEvent ) {
++ //does not return observable
++ handleInitializeApplicationIndex( event, message );
++ indexoperationObservable = Observable.just( new IndexOperationMessage() );
++ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
++ }
++ else if ( event instanceof ElasticsearchIndexEvent ) {
++ handleIndexOperation( ( ElasticsearchIndexEvent ) event );
++ indexoperationObservable = Observable.just( new IndexOperationMessage() );
++ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
}
-- if (event == null) {
-- logger.error("AsyncEvent type or event is null!");
-- return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
++ else {
++ throw new Exception( "Unknown EventType" );//TODO: print json instead
}
-- final AsyncEvent thisEvent = event;
-- if (logger.isDebugEnabled()) {
-- logger.debug("Processing {} event", event);
++ //collect all of the
++ IndexOperationMessage indexOperationMessage = indexoperationObservable
++ .collect( () -> new IndexOperationMessage(), ( collector, single ) -> collector.ingest( single ) )
++ .toBlocking().lastOrDefault( null );
++
++ if ( validateEmptySets && ( indexOperationMessage == null || indexOperationMessage.isEmpty() ) ) {
++ logger.error( "Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
++ message.getStringBody() );
++ throw new Exception( "Received empty index sequence." );
}
-- try {
-- //check for empty sets if this is true
-- boolean validateEmptySets = true;
-- Observable<IndexOperationMessage> indexoperationObservable;
-- //merge each operation to a master observable;
-- if (event instanceof EdgeDeleteEvent) {
-- indexoperationObservable = handleEdgeDelete(message);
-- } else if (event instanceof EdgeIndexEvent) {
-- indexoperationObservable = handleEdgeIndex(message);
-- } else if (event instanceof EntityDeleteEvent) {
-- indexoperationObservable = handleEntityDelete(message);
-- } else if (event instanceof EntityIndexEvent) {
-- indexoperationObservable = handleEntityIndexUpdate(message);
-- } else if (event instanceof InitializeApplicationIndexEvent) {
-- //does not return observable
-- handleInitializeApplicationIndex(event, message);
-- indexoperationObservable = Observable.just(new IndexOperationMessage());
- validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- } else if (event instanceof ElasticsearchIndexEvent){
- handleIndexOperation( (ElasticsearchIndexEvent)event );
- indexoperationObservable = Observable.just( new IndexOperationMessage() );
-- validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- }
-
- else {
- } else {
-- throw new Exception("Unknown EventType");//TODO: print json instead
-- }
--
-- //collect all of the
-- IndexOperationMessage indexOperationMessage =
-- indexoperationObservable
-- .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
-- .toBlocking().lastOrDefault(null);
--
-- if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
-- logger.error("Received empty index sequence message:({}), body:({}) ",
-- message.getMessageId(), message.getStringBody());
-- throw new Exception("Received empty index sequence.");
-- }
--
-- //return type that can be indexed and ack'd later
-- return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
-- } catch (Exception e) {
-- logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
-- return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
++ //return type that can be indexed and ack'd later
++ return new IndexEventResult( Optional.fromNullable( message ),
++ Optional.fromNullable( indexOperationMessage ), thisEvent.getCreationTime() );
++ }
++ catch ( Exception e ) {
++ logger.error( "Failed to index message: " + message.getMessageId(), message.getStringBody(), e );
++ return new IndexEventResult( Optional.absent(), Optional.<IndexOperationMessage>absent(),
++ event.getCreationTime());
}
});
@@@ -346,8 -328,7 +360,7 @@@
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope );
- offerTopic(
- new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
- offer(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
++ offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
}
@@@ -413,8 -394,8 +426,8 @@@
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
-- final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge(
-- applicationScope, entity, edge));
++ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
++ entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge));
return edgeIndexObservable;
}
@@@ -450,84 -431,9 +463,84 @@@
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
- offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
+ offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
}
+
+ /**
+ * Queue up an indexOperationMessage for multi region execution
+ * @param indexOperationMessage
+ */
+ public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+ final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
+
+ final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+ //write to the map in ES
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+
+
+
+ //now queue up the index message
+
+ final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+
+ //send to the topic so all regions index the batch
+
+ offerTopic( elasticsearchIndexEvent );
+ }
+
+ public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+ Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+ final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+ Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+ //load the entity
+
+ final String message = esMapPersistence.getString( messageId.toString() );
+
+ String highConsistency = null;
+
+ if(message == null){
+ logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
+
+ highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
+
+ }
+
+ //read the value from the string
+
+ final IndexOperationMessage indexOperationMessage;
+
+ //our original local read has it, parse it.
+ if(message != null){
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
+ }
+ //we tried to read it at a higher consistency level and it works
+ else if (highConsistency != null){
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
+ }
+
+ //we couldn't find it, bail
+ else{
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+
+ throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ }
+
+
+
+ //now execute it
+ indexProducer.put(indexOperationMessage).toBlocking().last();
+
+ }
+
+
+
@Override
public long getQueueDepth() {
return queue.getQueueDepth();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 3865ecb,8b44714..1649046
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@@ -28,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+ import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@@ -52,18 -51,21 +52,24 @@@ public class AsyncIndexProvider impleme
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
private final IndexProducer indexProducer;
+ private final MapManagerFactory mapManagerFactory;
+ private final QueueFig queueFig;
private AsyncEventService asyncEventService;
@Inject
- public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
- final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
- EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
- final MapManagerFactory mapManagerFactory ) {
+ public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
+ final QueueManagerFactory queueManagerFactory,
+ final MetricsFactory metricsFactory,
+ final RxTaskScheduler rxTaskScheduler,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EventBuilder eventBuilder,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory,
- final IndexProducer indexProducer, QueueFig queueFig) {
++ final IndexProducer indexProducer,
++ final MapManagerFactory mapManagerFactory,
++ final QueueFig queueFig) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@@ -74,7 -76,7 +80,8 @@@
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexProducer = indexProducer;
+ this.mapManagerFactory = mapManagerFactory;
+ this.queueFig = queueFig;
}
@@@ -98,11 -100,11 +105,10 @@@
case LOCAL:
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS:
-- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig );
++ throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig);
++ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index e83d6f8,5b921d9..8ee47a2
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@@ -89,7 -89,7 +93,7 @@@ public class AmazonAsyncEventServiceTes
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig );
++ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index a3fa05e,5ab1a4b..58b2a4d
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@@ -226,44 -171,43 +226,44 @@@ public class SNSQueueManagerImpl implem
String multiRegion = fig.getRegionList();
- if (logger.isDebugEnabled())
- logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
+ }
- String[] regionNames = multiRegion.split(",");
+ String[] regionNames = multiRegion.split( "," );
- final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
- final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
+ final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
+ final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
- arrQueueArns.put( primaryQueueArn, fig.getRegion() );
- topicArns.put( primaryTopicArn, fig.getRegion() );
+ arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion());
+ topicArns.put(primaryTopicArn, fig.getPrimaryRegion());
- for (String regionName : regionNames) {
+ for ( String regionName : regionNames ) {
regionName = regionName.trim();
- Regions regions = Regions.fromName(regionName);
- Region region = Region.getRegion(regions);
+ Regions regions = Regions.fromName( regionName );
+ Region region = Region.getRegion( regions );
- AmazonSQSClient sqsClient = createSQSClient(region);
- AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
+ AmazonSQSClient sqsClient = createSQSClient( region );
+ AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
// getTopicArn will create the SNS topic if it doesn't exist
- String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
- topicArns.put(topicArn, regionName);
+ String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
+ topicArns.put( topicArn, regionName );
// create the SQS queue if it doesn't exist
- String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
- if (queueArn == null) {
- queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
- queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
+ String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
+ if ( queueArn == null ) {
+ queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
+ queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
}
- arrQueueArns.put(queueArn, regionName);
+ arrQueueArns.put( queueArn, regionName );
}
- logger.debug("Creating Subscriptions...");
+ logger.debug( "Creating Subscriptions..." );
- for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
+ for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
String queueARN = queueArnEntry.getKey();
String strSqsRegion = queueArnEntry.getValue();
@@@ -650,10 -519,12 +650,10 @@@
/**
* Get the region
- *
- * @return
*/
private Region getRegion() {
- Regions regions = Regions.fromName( fig.getRegion() );
- return Region.getRegion( regions );
+ Regions regions = Regions.fromName(fig.getPrimaryRegion());
+ return Region.getRegion(regions);
}
[06/12] usergrid git commit: Adds comment
Posted by mr...@apache.org.
Adds comment
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4013f17e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4013f17e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4013f17e
Branch: refs/heads/2.1-release
Commit: 4013f17e434a5eb05f4851d28dba6a3e0bfe8fc8
Parents: 19d30ea
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 14:10:51 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 14:10:51 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4013f17e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 67d0dab..ee9054b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -536,6 +536,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
+
+ //NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message
+ //so we'll let compaction on column expiration handle deletion
+
//read the value from the string
Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
[09/12] usergrid git commit: Merge branch 'USERGRID-1048' of
https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1048
Posted by mr...@apache.org.
Merge branch 'USERGRID-1048' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1048
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d8e65721
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d8e65721
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d8e65721
Branch: refs/heads/2.1-release
Commit: d8e6572196b1c9245854ce1351d4d2171fbde80b
Parents: 3a7e60b 3ec0f58
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 17:07:31 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 17:07:31 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 4 +
.../corepersistence/index/PublishRxTest.java | 95 ----------------
.../usergrid/corepersistence/index/RxTest.java | 108 +++++++++++++++++++
.../persistence/core/astyanax/CassandraFig.java | 6 +-
4 files changed, 115 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d8e65721/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------