You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/13 22:09:49 UTC
[2/2] incubator-usergrid git commit: adding async events
adding async events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a63c8175
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a63c8175
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a63c8175
Branch: refs/heads/USERGRID-840
Commit: a63c8175d362054f49ce25de53dbbbdf402e2141
Parents: cbea6e0
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 14:09:37 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 14:09:37 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 4 ++-
.../asyncevents/AmazonAsyncEventService.java | 35 ++++++++++++++++----
.../asyncevents/AsyncEventService.java | 1 -
.../asyncevents/AsyncIndexProvider.java | 14 ++++++--
.../asyncevents/InMemoryAsyncEventService.java | 6 ----
.../asyncevents/model/AsyncEvent.java | 4 +--
.../model/InitializeApplicationIndexEvent.java | 1 +
.../model/InitializeManagementIndexEvent.java | 32 ------------------
.../index/AmazonAsyncEventServiceTest.java | 10 +++++-
9 files changed, 54 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 048c558..f24891e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -154,6 +154,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private void init() {
EntityManager em = getEntityManager(getManagementAppId());
+ indexService.queueInitializeApplicationIndex(CpNamingUtils.getApplicationScope(getManagementAppId()));
try {
if ( em.getApplication() == null ) {
@@ -252,9 +253,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
getSetup().setupApplicationKeyspace( applicationId, appName );
+ indexService.queueInitializeApplicationIndex(CpNamingUtils.getApplicationScope(applicationId));
if ( properties == null ) {
- properties = new TreeMap<>( CASE_INSENSITIVE_ORDER );
+ properties = new TreeMap<>( CASE_INSENSITIVE_ORDER);
}
properties.put( PROPERTY_NAME, appName );
EntityManager appEm = getEntityManager(applicationId);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e15824f..50000df 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,7 +29,10 @@ import com.codahale.metrics.Histogram;
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.CpEntityManager;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
@@ -77,6 +80,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final IndexProcessorFig indexProcessorFig;
private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final EntityIndexFactory entityIndexFactory;
private final Timer readTimer;
private final Timer writeTimer;
@@ -97,11 +102,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexProcessorFig indexProcessorFig,
final MetricsFactory metricsFactory,
final IndexService indexService,
- final EntityCollectionManagerFactory entityCollectionManagerFactory
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory
) {
this.indexService = indexService;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.entityIndexFactory = entityIndexFactory;
final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -229,6 +238,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
handleEntityIndexUpdate(message);
break;
+
+ case APPLICATION_INDEX:
+ handleInitializeApplicationIndex(message);
+
default:
logger.error("Unknown EventType: {}", event.getEventType());
@@ -242,13 +255,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
- offer( new InitializeApplicationIndexEvent( applicationScope ) );
+ offer(new InitializeApplicationIndexEvent(applicationScope));
}
- @Override
- public void queueInitializeManagementIndex() {
- offer( new InitializeManagementIndexEvent( ) );
- }
@Override
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
@@ -363,6 +372,20 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
+ public void handleInitializeApplicationIndex(final QueueMessage message) {
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+ Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
+ Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
+
+ final ApplicationScope applicationScope = event.getApplicationScope();
+ final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+ final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
+ index.initialize();
+ ack(message);
+ }
+
/**
* Loop through and start the workers
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1c97c3e..a36a9ae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -39,7 +39,6 @@ public interface AsyncEventService extends ReIndexAction {
* @param applicationScope
*/
void queueInitializeApplicationIndex( final ApplicationScope applicationScope );
- void queueInitializeManagementIndex( );
/**
* Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory)
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index f455f9c..0a58369 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -20,11 +20,13 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@ -46,6 +48,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final RxTaskScheduler rxTaskScheduler;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final EventBuilder eventBuilder;
+ private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final EntityIndexFactory entityIndexFactory;
private AsyncEventService asyncEventService;
@@ -57,7 +61,9 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
final IndexService indexService,
final RxTaskScheduler rxTaskScheduler,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EventBuilder eventBuilder) {
+ final EventBuilder eventBuilder,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@ -66,6 +72,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.rxTaskScheduler = rxTaskScheduler;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.eventBuilder = eventBuilder;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.entityIndexFactory = entityIndexFactory;
}
@@ -90,10 +98,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
case SQS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, rxTaskScheduler);
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, rxTaskScheduler);
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 2fc0bb2..adb4a90 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -71,12 +71,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
}
@Override
- public void queueInitializeManagementIndex() {
-
- }
-
-
- @Override
public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) {
//process the entity immediately
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index b331b6e..9278e0f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -149,9 +149,7 @@ public class AsyncEvent implements Serializable {
EDGE_INDEX,
ENTITY_DELETE,
ENTITY_INDEX,
- APPLICATION_INDEX,
- MANAGEMENT_INDEX;
- ;
+ APPLICATION_INDEX;
public String asString() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 6612e8b..656d820 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@JsonDeserialize(as = AsyncEvent.class)
public class InitializeApplicationIndexEvent extends AsyncEvent {
public InitializeApplicationIndexEvent() {
+ super(EventType.APPLICATION_INDEX);
}
public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
super(EventType.APPLICATION_INDEX, applicationScope);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
deleted file mode 100644
index 0af249d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-
-/**
- * Event to initialize mgmt index
- */
-@JsonDeserialize(as = AsyncEvent.class)
-public class InitializeManagementIndexEvent extends AsyncEvent{
- public InitializeManagementIndexEvent(){
- super(EventType.MANAGEMENT_INDEX);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 02f42b7..9cf896c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.junit.Rule;
import org.junit.runner.RunWith;
@@ -71,10 +72,17 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
public RxTaskScheduler rxTaskScheduler;
+ @Inject
+ public IndexLocationStrategyFactory indexLocationStrategyFactory;
+
+
+ @Inject
+ public EntityIndexFactory entityIndexFactory;
+
@Override
protected AsyncEventService getAsyncEventService() {
return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory );
}