You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/05 01:02:55 UTC
[1/2] incubator-usergrid git commit: Fixes runtime bug with joda time
conflict with Astayanx
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-900 d67c220dd -> 491008e96
Fixes runtime bug with joda time conflict with Astayanx
Fixes bug in AmazonUtils incorrectly re-throwing missing exception.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/29d115fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/29d115fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/29d115fc
Branch: refs/heads/USERGRID-900
Commit: 29d115fc88cc9a058735f3582bb73aee39ec16e7
Parents: d67c220
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 15:13:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 15:13:04 2015 -0600
----------------------------------------------------------------------
stack/corepersistence/queue/pom.xml | 20 +-
.../queue/util/AmazonNotificationUtils.java | 230 ++++++++++---------
2 files changed, 135 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 7780997..2d46dc8 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -54,13 +54,19 @@
<!-- tests -->
- <dependency>
- <groupId>org.apache.usergrid</groupId>
- <artifactId>common</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.usergrid</groupId>
+ <artifactId>common</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.8.1</version>
+ </dependency>
<dependency>
<groupId>org.apache.usergrid</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 1d86823..9561a58 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,141 +1,150 @@
package org.apache.usergrid.persistence.queue.util;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.policy.*;
-import com.amazonaws.auth.policy.actions.SQSActions;
-import com.amazonaws.auth.policy.conditions.ConditionFactory;
-import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.*;
-import com.amazonaws.services.sns.util.Topics;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.queue.QueueFig;
+
+import com.amazonaws.auth.policy.Condition;
+import com.amazonaws.auth.policy.Policy;
+import com.amazonaws.auth.policy.Principal;
+import com.amazonaws.auth.policy.Resource;
+import com.amazonaws.auth.policy.Statement;
+import com.amazonaws.auth.policy.actions.SQSActions;
+import com.amazonaws.auth.policy.conditions.ConditionFactory;
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.CreateTopicResult;
+import com.amazonaws.services.sns.model.ListTopicsResult;
+import com.amazonaws.services.sns.model.Topic;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
+
+
/**
* Created by Jeff West on 5/25/15.
*/
public class AmazonNotificationUtils {
- private static final Logger logger = LoggerFactory.getLogger(AmazonNotificationUtils.class);
+ private static final Logger logger = LoggerFactory.getLogger( AmazonNotificationUtils.class );
+
- public static String createQueue(final AmazonSQSClient sqs,
- final String queueName,
- final QueueFig fig)
- throws Exception {
+ public static String createQueue( final AmazonSQSClient sqs, final String queueName, final QueueFig fig )
+ throws Exception {
- final String deadletterQueueName = String.format("%s_dead", queueName);
- final Map<String, String> deadLetterAttributes = new HashMap<>(2);
+ final String deadletterQueueName = String.format( "%s_dead", queueName );
+ final Map<String, String> deadLetterAttributes = new HashMap<>( 2 );
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
+ deadLetterAttributes.put( "MessageRetentionPeriod", fig.getDeadletterRetentionPeriod() );
- CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
- .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
+ CreateQueueRequest createDeadLetterQueueRequest =
+ new CreateQueueRequest().withQueueName( deadletterQueueName ).withAttributes( deadLetterAttributes );
- final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
+ final CreateQueueResult deadletterResult = sqs.createQueue( createDeadLetterQueueRequest );
- logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
+ logger.info( "Created deadletter queue with url {}", deadletterResult.getQueueUrl() );
- final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName);
+ final String deadletterArn = AmazonNotificationUtils.getQueueArnByName( sqs, deadletterQueueName );
- String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
- " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
+ String redrivePolicy = String
+ .format( "{\"maxReceiveCount\":\"%s\"," + " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(),
+ deadletterArn );
- final Map<String, String> queueAttributes = new HashMap<>(2);
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
- deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
+ final Map<String, String> queueAttributes = new HashMap<>( 2 );
+ deadLetterAttributes.put( "MessageRetentionPeriod", fig.getRetentionPeriod() );
+ deadLetterAttributes.put( "RedrivePolicy", redrivePolicy );
CreateQueueRequest createQueueRequest = new CreateQueueRequest().
- withQueueName(queueName)
- .withAttributes(queueAttributes);
+ withQueueName( queueName )
+ .withAttributes( queueAttributes );
- CreateQueueResult result = sqs.createQueue(createQueueRequest);
+ CreateQueueResult result = sqs.createQueue( createQueueRequest );
String url = result.getQueueUrl();
- logger.info("Created SQS queue with url {}", url);
+ logger.info( "Created SQS queue with url {}", url );
return url;
}
- public static void setQueuePermissionsToReceive(final AmazonSQSClient sqs,
- final String queueUrl,
- final List<String> topicARNs) throws Exception{
- String queueARN = getQueueArnByUrl(sqs, queueUrl);
+ public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl,
+ final List<String> topicARNs ) throws Exception {
- Statement statement = new Statement(Statement.Effect.Allow)
- .withActions(SQSActions.SendMessage)
- .withPrincipals(new Principal("*"))
- .withResources(new Resource(queueARN));
+ String queueARN = getQueueArnByUrl( sqs, queueUrl );
- List<Condition> conditions = new ArrayList<>();
+ Statement statement = new Statement( Statement.Effect.Allow ).withActions( SQSActions.SendMessage )
+ .withPrincipals( new Principal( "*" ) )
+ .withResources( new Resource( queueARN ) );
- for(String topicARN : topicARNs){
+ List<Condition> conditions = new ArrayList<>();
- conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+ for ( String topicARN : topicARNs ) {
+ conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) );
}
- statement.setConditions(conditions);
+ statement.setConditions( conditions );
- Policy policy = new Policy("SubscriptionPermission").withStatements(statement);
+ Policy policy = new Policy( "SubscriptionPermission" ).withStatements( statement );
final Map<String, String> queueAttributes = new HashMap<>();
- queueAttributes.put("Policy", policy.toJson());
+ queueAttributes.put( "Policy", policy.toJson() );
- SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest(queueUrl, queueAttributes);
+ SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest( queueUrl, queueAttributes );
try {
- sqs.setQueueAttributes(queueAttributesRequest);
- }catch (Exception e){
- logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString(), e);
+ sqs.setQueueAttributes( queueAttributesRequest );
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
+ topicARNs.toString(), e );
}
-
-
}
- public static String getQueueArnByName(final AmazonSQSClient sqs,
- final String queueName)
- throws Exception {
+ public static String getQueueArnByName( final AmazonSQSClient sqs, final String queueName ) throws Exception {
String queueUrl = null;
try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ GetQueueUrlResult result = sqs.getQueueUrl( queueName );
queueUrl = result.getQueueUrl();
-
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
+ }
+ catch ( QueueDoesNotExistException queueDoesNotExistException ) {
//no op, swallow
- logger.warn("Queue {} does not exist", queueName);
+ logger.warn( "Queue {} does not exist", queueName );
return null;
-
- } catch (Exception e) {
- logger.error(String.format("Failed to get URL for Queue [%s] from SQS", queueName), e);
+ }
+ catch ( Exception e ) {
+ logger.error( String.format( "Failed to get URL for Queue [%s] from SQS", queueName ), e );
throw e;
}
- if (queueUrl != null) {
+ if ( queueUrl != null ) {
try {
- GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
- .withAttributeNames("All");
+ GetQueueAttributesRequest queueAttributesRequest =
+ new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" );
- GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+ GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest );
Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
- return sqsAttributeMap.get("QueueArn");
-
- } catch (Exception e) {
- logger.error("Failed to get queue URL from service", e);
+ return sqsAttributeMap.get( "QueueArn" );
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to get queue URL from service", e );
throw e;
}
}
@@ -143,75 +152,80 @@ public class AmazonNotificationUtils {
return null;
}
- public static String getQueueArnByUrl(final AmazonSQSClient sqs,
- final String queueUrl)
- throws Exception {
+
+ public static String getQueueArnByUrl( final AmazonSQSClient sqs, final String queueUrl ) throws Exception {
try {
- GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
- .withAttributeNames("All");
+ GetQueueAttributesRequest queueAttributesRequest =
+ new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" );
- GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+ GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest );
Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
- return sqsAttributeMap.get("QueueArn");
-
- } catch (Exception e) {
- logger.error("Failed to get queue URL from service", e);
+ return sqsAttributeMap.get( "QueueArn" );
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to get queue URL from service", e );
throw e;
}
}
- public static String getTopicArn(final AmazonSNSClient sns,
- final String queueName,
- final boolean createOnMissing)
- throws Exception {
- if (logger.isDebugEnabled())
- logger.debug("Looking up Topic ARN: {}", queueName);
+ public static String getTopicArn( final AmazonSNSClient sns, final String queueName, final boolean createOnMissing )
+ throws Exception {
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Looking up Topic ARN: {}", queueName );
+ }
ListTopicsResult listTopicsResult = sns.listTopics();
String topicArn = null;
- for (Topic topic : listTopicsResult.getTopics()) {
+ for ( Topic topic : listTopicsResult.getTopics() ) {
String arn = topic.getTopicArn();
- if (queueName.equals(arn.substring(arn.lastIndexOf(':')))) {
+ if ( queueName.equals( arn.substring( arn.lastIndexOf( ':' ) ) ) ) {
topicArn = arn;
- logger.info("Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName);
+ logger.info( "Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName );
}
}
- if (topicArn == null && createOnMissing) {
- logger.info("Creating topic for queue=[{}]...", queueName);
+ if ( topicArn == null && createOnMissing ) {
+ logger.info( "Creating topic for queue=[{}]...", queueName );
- CreateTopicResult createTopicResult = sns.createTopic(queueName);
+ CreateTopicResult createTopicResult = sns.createTopic( queueName );
topicArn = createTopicResult.getTopicArn();
- logger.info("Successfully created topic with name {} and arn {}", queueName, topicArn);
- } else {
- logger.error("Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName, createOnMissing);
+ logger.info( "Successfully created topic with name {} and arn {}", queueName, topicArn );
+ }
+ else {
+ logger.error( "Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName,
+ createOnMissing );
}
- if (logger.isDebugEnabled())
- logger.debug("Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName );
+ }
return topicArn;
}
- public static String getQueueUrlByName(final AmazonSQSClient sqs,
- final String queueName) {
+
+ public static String getQueueUrlByName( final AmazonSQSClient sqs, final String queueName ) {
try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ GetQueueUrlResult result = sqs.getQueueUrl( queueName );
return result.getQueueUrl();
- } catch (QueueDoesNotExistException e) {
- logger.error("Queue {} does not exist", queueName);
- throw e;
- } catch (Exception e) {
- logger.error("failed to get queue from service", e);
+ }
+ catch ( QueueDoesNotExistException e ) {
+ //no op, return null
+ logger.error( "Queue {} does not exist", queueName );
+ return null;
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to get queue from service", e );
throw e;
}
}
[2/2] incubator-usergrid git commit: Upgrades Jackson to fix this
issue.
Posted by to...@apache.org.
Upgrades Jackson to fix this issue.
https://github.com/FasterXML/jackson-databind/issues/656
Upgrades events to be polymorphic to allow for easy addition and mapping of events without 1 large class for all events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/491008e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/491008e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/491008e9
Branch: refs/heads/USERGRID-900
Commit: 491008e961ac893f734d12352af2ac040b1ea1fe
Parents: 29d115f
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 17:02:53 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 17:02:53 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 145 ++++++++++--------
.../asyncevents/EventBuilderImpl.java | 9 +-
.../asyncevents/model/AsyncEvent.java | 153 +++----------------
.../asyncevents/model/EdgeDeleteEvent.java | 36 ++++-
.../asyncevents/model/EdgeIndexEvent.java | 39 ++++-
.../asyncevents/model/EntityDeleteEvent.java | 18 ++-
.../asyncevents/model/EntityIndexEvent.java | 18 ++-
.../model/InitializeApplicationIndexEvent.java | 21 ++-
.../index/AmazonAsyncEventServiceTest.java | 3 -
.../index/AsyncIndexServiceTest.java | 14 +-
stack/corepersistence/pom.xml | 3 +-
.../index/IndexLocationStrategy.java | 4 +-
12 files changed, 222 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 0429af3..85aecdf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,20 +25,20 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import com.codahale.metrics.Histogram;
-import com.google.common.base.Preconditions;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.*;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -46,6 +46,10 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.queue.QueueManager;
@@ -56,7 +60,9 @@ import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -211,43 +217,42 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- private void handleMessages(final List<QueueMessage> messages) {
- if (logger.isDebugEnabled()) logger.debug("handleMessages with {} message", messages.size());
- for (QueueMessage message : messages) {
- final AsyncEvent event = (AsyncEvent) message.getBody();
-
- if (logger.isDebugEnabled()) logger.debug("Processing {} event", event.getEventType());
-
- if (event == null || event.getEventType() == null) {
- logger.error("AsyncEvent type or event is null!");
- } else {
- switch (event.getEventType()) {
+ private void handleMessages( final List<QueueMessage> messages ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "handleMessages with {} message", messages.size() );
+ }
- case EDGE_DELETE:
- handleEdgeDelete(message);
- break;
+ for ( QueueMessage message : messages ) {
+ final AsyncEvent event = ( AsyncEvent ) message.getBody();
- case EDGE_INDEX:
- handleEdgeIndex(message);
- break;
+ logger.debug( "Processing {} event", event );
- case ENTITY_DELETE:
- handleEntityDelete(message);
- break;
+ if ( event == null ) {
+ logger.error( "AsyncEvent type or event is null!" );
+ continue;
+ }
- case ENTITY_INDEX:
- handleEntityIndexUpdate(message);
- break;
- case APPLICATION_INDEX:
- handleInitializeApplicationIndex(message);
- break;
+ if ( event instanceof EdgeDeleteEvent ) {
+ handleEdgeDelete( message );
+ }
+ else if ( event instanceof EdgeIndexEvent ) {
+ handleEdgeIndex( message );
+ }
- default:
- logger.error("Unknown EventType: {}", event.getEventType());
+ else if ( event instanceof EntityDeleteEvent ) {
+ handleEntityDelete( message );
+ }
+ else if ( event instanceof EntityIndexEvent ) {
+ handleEntityIndexUpdate( message );
+ }
- }
+ else if ( event instanceof InitializeApplicationIndexEvent ) {
+ handleInitializeApplicationIndex( message );
+ }
+ else {
+ logger.error( "Unknown EventType: {}", event );
}
messageCycle.update( System.currentTimeMillis() - event.getCreationTime() );
@@ -257,7 +262,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
- IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+ IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
+ applicationScope );
offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
}
@@ -272,19 +278,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void handleEntityIndexUpdate(final QueueMessage message) {
- Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
+ Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
- final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
+ final AsyncEvent event = ( AsyncEvent ) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
+ Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
+
+ final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
+
//process the entity immediately
//only process the same version, otherwise ignore
- final EntityIdScope entityIdScope = event.getEntityIdScope();
+ final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
final Id entityId = entityIdScope.getId();
- final long updatedAfter = event.getUpdatedAfter();
+ final long updatedAfter = entityIndexEvent.getUpdatedAfter();
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
@@ -310,17 +319,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeIndex");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_INDEX, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getEventType()));
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
+ Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
+
+ final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
- final ApplicationScope applicationScope = event.getApplicationScope();
- final Edge edge = event.getEdge();
+ final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
+ final Edge edge = edgeIndexEvent.getEdge();
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
- final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
+ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
applicationScope, entity, edge ) );
subscibeAndAck( edgeIndexObservable, message );
@@ -339,11 +350,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeDelete");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_DELETE, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getEventType()));
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
+ Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
- final ApplicationScope applicationScope = event.getApplicationScope();
- final Edge edge = event.getEdge();
+
+ final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
+
+ final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
+ final Edge edge = edgeIndexEvent.getEdge();
if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
@@ -364,12 +378,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
- Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
- String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
+ Preconditions.checkArgument( event instanceof EntityDeleteEvent,
+ String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
- final ApplicationScope applicationScope = event.getApplicationScope();
- final Id entityId = event.getEntityId();
+
+ final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
+ final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
+ final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
if (logger.isDebugEnabled())
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
@@ -391,10 +407,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex" );
+ Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
+
+ final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
+ ( InitializeApplicationIndexEvent ) event;
- final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
+ final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
index.initialize();
ack( message );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 46cec2e..4bf5695 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -174,8 +174,15 @@ public class EventBuilderImpl implements EventBuilder {
entity -> {
final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+ /**
+ * We don't have a modified field, so we can't check, pass it through
+ */
+ if ( modified == null ) {
+ return true;
+ }
+
//only re-index if it has been updated and been updated after our timestamp
- return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
+ return modified.getValue() >= entityIndexOperation.getUpdatedSince();
} )
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 3d22986..6b45297 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -19,155 +19,42 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import java.io.Serializable;
+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.model.entity.Id;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import java.io.Serializable;
/**
- * Created by Jeff West on 5/25/15.
+ * Marker class for serialization
*/
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AsyncEvent implements Serializable {
-
- @JsonProperty
- protected IndexLocationStrategy indexLocationStrategy;
-
- @JsonProperty
- protected EventType eventType;
-
- @JsonProperty
- protected EntityIdScope entityIdScope;
-
- @JsonProperty
- protected ApplicationScope applicationScope;
-
- @JsonProperty
- protected Id entityId;
+@JsonIgnoreProperties( ignoreUnknown = true )
+@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
+@JsonSubTypes( {
+ @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
+ @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
+ @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
+ @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
+ @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" )
+} )
- @JsonProperty
- protected Edge edge;
+public abstract class AsyncEvent implements Serializable {
@JsonProperty
protected long creationTime;
- /**
- * required for jackson, do not delete
- */
+ //set by default, will be overridden when de-serializing
protected AsyncEvent() {
+ creationTime = System.currentTimeMillis();
}
- public AsyncEvent(final EventType eventType) {
-
- this.eventType = eventType;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(final EventType eventType,
- final EntityIdScope entityIdScope) {
-
- this.eventType = eventType;
- this.entityIdScope = entityIdScope;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(EventType eventType, IndexLocationStrategy indexLocationStrategy) {
- this.eventType = eventType;
- this.indexLocationStrategy = indexLocationStrategy;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Edge edge) {
- this.eventType = eventType;
- this.applicationScope = applicationScope;
- this.edge = edge;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Id entityId, Edge edge) {
- this.eventType = eventType;
- this.applicationScope = applicationScope;
- this.edge = edge;
- this.entityId = entityId;
- this.creationTime = System.currentTimeMillis();
- }
-
- @JsonSerialize()
- public final Id getEntityId() {
- return entityId;
- }
-
- protected void setEntityId(Id entityId) {
- this.entityId = entityId;
- }
-
- @JsonSerialize()
- public final EventType getEventType() {
- return eventType;
- }
-
- protected void setEventType(EventType eventType) {
- this.eventType = eventType;
- }
-
- @JsonSerialize()
- public EntityIdScope getEntityIdScope() {
- return entityIdScope;
- }
-
- protected void setEntityIdScope(EntityIdScope entityIdScope) {
- this.entityIdScope = entityIdScope;
- }
-
- @JsonSerialize()
- public ApplicationScope getApplicationScope() {
- return applicationScope;
- }
-
- protected void setApplicationScope(ApplicationScope applicationScope) {
- this.applicationScope = applicationScope;
- }
-
- @JsonSerialize()
- @JsonDeserialize(as=ReplicatedIndexLocationStrategy.class)
- public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }
-
- protected void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
- this.indexLocationStrategy = indexLocationStrategy;
- }
-
- @JsonSerialize()
- public Edge getEdge() {
- return edge;
- }
-
- @JsonSerialize()
- public long getCreationTime() { return creationTime; }
-
- protected void setEdge(Edge edge) {
- this.edge = edge;
- }
-
- public enum EventType {
- EDGE_DELETE,
- EDGE_INDEX,
- ENTITY_DELETE,
- ENTITY_INDEX,
- APPLICATION_INDEX;
-
- public String asString() {
- return toString();
- }
+ public long getCreationTime() {
+ return creationTime;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
index 3af9818..af16bac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
@@ -19,19 +19,41 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+
public final class EdgeDeleteEvent extends AsyncEvent {
+
+ @JsonProperty
+ protected ApplicationScope applicationScope;
+
+
+ @JsonProperty
+ protected Edge edge;
+
+
public EdgeDeleteEvent() {
}
- public EdgeDeleteEvent(ApplicationScope applicationScope, Edge edge) {
- super(EventType.EDGE_DELETE, applicationScope, edge);
+
+ public EdgeDeleteEvent( ApplicationScope applicationScope, Edge edge ) {
+ this.applicationScope = applicationScope;
+ this.edge = edge;
+ }
+
+
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+
+ public Edge getEdge() {
+ return edge;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
index cd0118f..c89b828 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -28,13 +30,19 @@ import org.apache.usergrid.persistence.model.entity.Id;
import java.io.Serializable;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+
public final class EdgeIndexEvent
- extends AsyncEvent
- implements Serializable {
+ extends AsyncEvent {
+
+
+ @JsonProperty
+ protected ApplicationScope applicationScope;
+
+ @JsonProperty
+ protected Id entityId;
+
+ @JsonProperty
+ protected Edge edge;
/**
* Needed by jackson
@@ -43,6 +51,23 @@ public final class EdgeIndexEvent
}
public EdgeIndexEvent(ApplicationScope applicationScope, Id entityId, Edge edge) {
- super(EventType.EDGE_INDEX, applicationScope, entityId, edge);
+ this.applicationScope = applicationScope;
+ this.entityId = entityId;
+ this.edge = edge;
+ }
+
+
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+
+ public Edge getEdge() {
+ return edge;
+ }
+
+
+ public Id getEntityId() {
+ return entityId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 606deae..847a07d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -19,20 +19,28 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
public final class EntityDeleteEvent extends AsyncEvent {
+
+
+ @JsonProperty
+ protected EntityIdScope entityIdScope;
+
public EntityDeleteEvent() {
}
public EntityDeleteEvent(EntityIdScope entityIdScope) {
- super(EventType.ENTITY_DELETE, entityIdScope);
+ this.entityIdScope = entityIdScope;
+ }
+
+
+ public EntityIdScope getEntityIdScope() {
+ return entityIdScope;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
index 81961a0..a04326a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -19,22 +19,26 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+
public final class EntityIndexEvent extends AsyncEvent {
+
+ @JsonProperty
+ protected EntityIdScope entityIdScope;
+
+ @JsonProperty
private long updatedAfter;
public EntityIndexEvent() {
}
public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) {
- super(EventType.ENTITY_INDEX, entityIdScope);
+ this.entityIdScope = entityIdScope;
this.updatedAfter = updatedAfter;
}
@@ -44,7 +48,7 @@ public final class EntityIndexEvent extends AsyncEvent {
}
- public void setUpdatedAfter( long updatedAfter ) {
- this.updatedAfter = updatedAfter;
+ public EntityIdScope getEntityIdScope() {
+ return entityIdScope;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 8b20651..68f0113 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -19,20 +19,31 @@
*/
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
/**
* event to init app index
*/
-@JsonDeserialize(as = AsyncEvent.class)
+
public class InitializeApplicationIndexEvent extends AsyncEvent {
- public InitializeApplicationIndexEvent() {
- super(EventType.APPLICATION_INDEX);
- }
+
+
+ @JsonProperty
+ protected IndexLocationStrategy indexLocationStrategy;
+
public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) {
- super(EventType.APPLICATION_INDEX, indexLocationStrategy);
+ this.indexLocationStrategy = indexLocationStrategy;
+
+ }
+
+ public IndexLocationStrategy getIndexLocationStrategy() {
+ return indexLocationStrategy;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index d37701b..4660389 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -67,9 +67,6 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
public MetricsFactory metricsFactory;
@Inject
- public IndexService indexService;
-
- @Inject
public RxTaskScheduler rxTaskScheduler;
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 9b104fc..d34a1a9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -131,7 +131,7 @@ public abstract class AsyncIndexServiceTest {
/**
- * Write 10k edges 10 at a time in parallel
+ * Write 500 edges
*/
@@ -139,19 +139,19 @@ public abstract class AsyncIndexServiceTest {
final Id connectingId = createId("connecting");
final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());
- return graphManager.writeEdge(edge).subscribeOn(Schedulers.io());
+ return graphManager.writeEdge( edge );
}).toList().toBlocking().last();
+ //queue up processing
asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity );
- emf.refreshIndex(applicationScope.getApplication().getUuid());
-
- // Thread.sleep( 1000000000000l );
final EntityIndex EntityIndex =
entityIndexFactory.createEntityIndex( indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+ emf.refreshIndex(applicationScope.getApplication().getUuid());
+
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
//query until it's available
@@ -176,13 +176,13 @@ public abstract class AsyncIndexServiceTest {
}
- private CandidateResults getResults( final EntityIndex EntityIndex,
+ private CandidateResults getResults( final EntityIndex entityIndex,
final SearchEdge searchEdge, final SearchTypes searchTypes, final int expectedSize, final int attempts ) {
for ( int i = 0; i < attempts; i++ ) {
final CandidateResults candidateResults =
- EntityIndex.search( searchEdge, searchTypes, "select *", 100, 0 );
+ entityIndex.search( searchEdge, searchTypes, "select *", 100, 0 );
if ( candidateResults.size() == expectedSize ) {
return candidateResults;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index b95f3e1..1ee5272 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -65,8 +65,7 @@ limitations under the License.
<guice.version>4.0-beta5</guice.version>
<guicyfig.version>3.2</guicyfig.version>
<hystrix.version>1.4.0</hystrix.version>
- <jackson-2-version>2.4.1</jackson-2-version>
- <jackson-smile.verson>2.4.3</jackson-smile.verson>
+ <jackson-2-version>2.6.0</jackson-2-version>
<mockito.version>1.10.8</mockito.version>
<junit.version>4.11</junit.version>
<kryo-serializers.version>0.26</kryo-serializers.version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
index 6b07b9e..e5c8f8f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
@@ -19,6 +19,8 @@
*/
package org.apache.usergrid.persistence.index;
+import java.io.Serializable;
+
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
/**
* location strategy for index
@@ -33,7 +35,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
applicationIndexName = {indexRoot}_applications_{bucketId}
applicationAliasName = {indexRoot}_{appId}_read_alias || {indexRoot}_{appId}_write_alias
*/
-public interface IndexLocationStrategy {
+public interface IndexLocationStrategy extends Serializable {
/**
* get the alias name
* @return