You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/14 19:10:11 UTC
[06/14] incubator-usergrid git commit: adding async events
adding async events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cbea6e0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cbea6e0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cbea6e0b
Branch: refs/heads/observable-query-fix
Commit: cbea6e0bdf20d975e538b5ac6ec9b9b7110c0367
Parents: de1daae
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 11:15:24 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 11:15:24 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 19 +++++++----
.../asyncevents/AsyncEventService.java | 8 +++++
.../asyncevents/InMemoryAsyncEventService.java | 14 ++++++++
.../asyncevents/model/AsyncEvent.java | 17 ++++++++-
.../model/InitializeApplicationIndexEvent.java | 36 ++++++++++++++++++++
.../model/InitializeManagementIndexEvent.java | 32 +++++++++++++++++
.../index/ApplicationIndexLocationStrategy.java | 7 ----
.../index/IndexLocationStrategyFactoryImpl.java | 2 +-
8 files changed, 120 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c5b836b..e15824f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -30,6 +30,7 @@ import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.CpEntityManager;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,11 +77,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final IndexProcessorFig indexProcessorFig;
private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final RxTaskScheduler rxTaskScheduler;
private final Timer readTimer;
private final Timer writeTimer;
- private final Timer messageProcessingTimer;
private final Object mutex = new Object();
@@ -98,12 +97,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexProcessorFig indexProcessorFig,
final MetricsFactory metricsFactory,
final IndexService indexService,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final RxTaskScheduler rxTaskScheduler) {
+ final EntityCollectionManagerFactory entityCollectionManagerFactory
+ ) {
this.indexService = indexService;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.rxTaskScheduler = rxTaskScheduler;
final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -111,7 +109,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
- this.messageProcessingTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.message_processing");
this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error");
this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle");
@@ -244,6 +241,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
+ public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
+ offer( new InitializeApplicationIndexEvent( applicationScope ) );
+ }
+
+ @Override
+ public void queueInitializeManagementIndex() {
+ offer( new InitializeManagementIndexEvent( ) );
+ }
+
+ @Override
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
final Entity entity) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 9fbed39..1c97c3e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -34,6 +35,13 @@ public interface AsyncEventService extends ReIndexAction {
/**
+ * Initialize index for creation
+ * @param applicationScope
+ */
+ void queueInitializeApplicationIndex( final ApplicationScope applicationScope );
+ void queueInitializeManagementIndex( );
+
+ /**
* Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory)
* We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available.
* After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index b8e544d..2fc0bb2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import com.amazonaws.services.opsworks.model.App;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +65,18 @@ public class InMemoryAsyncEventService implements AsyncEventService {
@Override
+ public void queueInitializeApplicationIndex(final ApplicationScope applicationScope) {
+ //index will be initialized locally, don't need to inform other indexes
+ return;
+ }
+
+ @Override
+ public void queueInitializeManagementIndex() {
+
+ }
+
+
+ @Override
public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) {
//process the entity immediately
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 66476f9..b331b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -61,6 +61,12 @@ public class AsyncEvent implements Serializable {
protected AsyncEvent() {
}
+ public AsyncEvent(final EventType eventType) {
+
+ this.eventType = eventType;
+ this.creationTime = System.currentTimeMillis();
+ }
+
public AsyncEvent(final EventType eventType,
final EntityIdScope entityIdScope) {
@@ -69,6 +75,12 @@ public class AsyncEvent implements Serializable {
this.creationTime = System.currentTimeMillis();
}
+ public AsyncEvent(EventType eventType, ApplicationScope applicationScope) {
+ this.eventType = eventType;
+ this.applicationScope = applicationScope;
+ this.creationTime = System.currentTimeMillis();
+ }
+
public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Edge edge) {
this.eventType = eventType;
this.applicationScope = applicationScope;
@@ -136,7 +148,10 @@ public class AsyncEvent implements Serializable {
EDGE_DELETE,
EDGE_INDEX,
ENTITY_DELETE,
- ENTITY_INDEX;
+ ENTITY_INDEX,
+ APPLICATION_INDEX,
+ MANAGEMENT_INDEX;
+ ;
public String asString() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
new file mode 100644
index 0000000..6612e8b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+/**
+ * event to init app index
+ */
+@JsonDeserialize(as = AsyncEvent.class)
+public class InitializeApplicationIndexEvent extends AsyncEvent {
+ public InitializeApplicationIndexEvent() {
+ }
+ public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
+ super(EventType.APPLICATION_INDEX, applicationScope);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
new file mode 100644
index 0000000..0af249d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+/**
+ * Event to initialize mgmt index
+ */
+@JsonDeserialize(as = AsyncEvent.class)
+public class InitializeManagementIndexEvent extends AsyncEvent{
+ public InitializeManagementIndexEvent(){
+ super(EventType.MANAGEMENT_INDEX);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
index fcfb09b..c105119 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
@@ -31,11 +31,8 @@ import org.apache.usergrid.utils.StringUtils;
* Strategy for getting the application index name.
*/
class ApplicationIndexLocationStrategy implements IndexLocationStrategy {
- private final ClusterFig clusterFig;
- private final CassandraFig cassandraFig;
private final IndexFig indexFig;
private final ApplicationScope applicationScope;
- private final ApplicationIndexBucketLocator applicationIndexBucketLocator;
private final String indexBucketName;
private final IndexAlias alias;
private final String indexRootName;
@@ -45,12 +42,8 @@ class ApplicationIndexLocationStrategy implements IndexLocationStrategy {
final IndexFig indexFig,
final ApplicationScope applicationScope,
final ApplicationIndexBucketLocator applicationIndexBucketLocator){
- this.clusterFig = clusterFig;
-
- this.cassandraFig = cassandraFig;
this.indexFig = indexFig;
this.applicationScope = applicationScope;
- this.applicationIndexBucketLocator = applicationIndexBucketLocator;
this.indexRootName = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace().toLowerCase();
this.alias = new ApplicationIndexAlias(indexFig, applicationScope, indexRootName);
this.indexBucketName = indexRootName + "_applications_" + applicationIndexBucketLocator.getBucket(applicationScope);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
index 2d71e41..6a99890 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
@@ -47,7 +47,7 @@ public class IndexLocationStrategyFactoryImpl implements IndexLocationStrategyFa
this.applicationLocatorBucketStrategy = applicationLocatorBucketStrategy;
this.coreIndexFig = coreIndexFig;
}
- public IndexLocationStrategy getIndexLocationStrategy(ApplicationScope applicationScope){
+ public IndexLocationStrategy getIndexLocationStrategy( final ApplicationScope applicationScope){
if(CpNamingUtils.getManagementApplicationId().equals(applicationScope.getApplication())){
return new ManagementIndexLocationStrategy(clusterFig,cassandraFig,indexFig,coreIndexFig);
}