You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/04/13 22:42:41 UTC
usergrid git commit: Make the intial write path of indexing require
less I/O.
Repository: usergrid
Updated Branches:
refs/heads/release-2.1.1 a46224419 -> 10ff27cc8
Make the intial write path of indexing require less I/O.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10ff27cc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10ff27cc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10ff27cc
Branch: refs/heads/release-2.1.1
Commit: 10ff27cc86923a24c52e881f821870867dd95212
Parents: a462244
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 13 22:42:12 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 13 22:42:12 2016 +0200
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 102 ++++++++++++++-----
.../asyncevents/model/AsyncEvent.java | 2 +
.../asyncevents/model/EdgeIndexEvent.java | 70 +++++++++++++
.../asyncevents/model/EntityIndexEvent.java | 54 ++++++++++
4 files changed, 200 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 82ad5be..3b01292 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -310,20 +310,33 @@ public class AsyncEventServiceImpl implements AsyncEventService {
logger.debug("Processing event with type {}", event.getClass().getSimpleName());
}
- IndexOperationMessage indexOperationMessage = null;
try {
+ IndexOperationMessage single = new IndexOperationMessage();
+
+ // normal indexing event for an entity
+ if ( event instanceof EntityIndexEvent ){
+
+ single = handleEntityIndexUpdate( message );
+
+ }
+ // normal indexing event for an edge
+ else if ( event instanceof EdgeIndexEvent ){
+
+ single = handleEdgeIndex( message );
+
+ }
// deletes are 2-part, actual IO to delete data, then queue up a de-index
- if ( event instanceof EdgeDeleteEvent ) {
+ else if ( event instanceof EdgeDeleteEvent ) {
- handleEdgeDelete( message );
+ single = handleEdgeDelete( message );
}
// deletes are 2-part, actual IO to delete data, then queue up a de-index
else if ( event instanceof EntityDeleteEvent ) {
- handleEntityDelete( message );
+ single = handleEntityDelete( message );
}
- // application initialization has special logic, therefore a special event type
+ // initialization has special logic, therefore a special event type and no index operation message
else if ( event instanceof InitializeApplicationIndexEvent ) {
handleInitializeApplicationIndex(event, message);
@@ -331,11 +344,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// this is the main event that pulls the index doc from map persistence and hands to the index producer
else if (event instanceof ElasticsearchIndexEvent) {
- indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);
+ handleIndexOperation((ElasticsearchIndexEvent) event);
} else if (event instanceof DeIndexOldVersionsEvent) {
- indexOperationMessage = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
+ single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
} else {
@@ -343,9 +356,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- // returning indexOperationMessage will send that indexOperationMessage to the index producer
// if no exception happens and the QueueMessage is returned in these results, it will get ack'd
- return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime());
+ return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());
} catch (IndexDocNotFoundException e){
@@ -382,6 +394,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final Entity entity, long updatedAfter) {
+ offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0));
+
final EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter);
@@ -392,19 +406,56 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
+ private IndexOperationMessage handleEntityIndexUpdate(final QueueMessage message) {
+
+ Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
+
+ final AsyncEvent event = ( AsyncEvent ) message.getBody();
+
+ Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
+ Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
+
+ final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
+
+
+ //process the entity immediately
+ //only process the same version, otherwise ignore
+ final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
+ final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+ final Id entityId = entityIdScope.getId();
+ final long updatedAfter = entityIndexEvent.getUpdatedAfter();
+
+ final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
+
+ return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+ }
+
@Override
public void queueNewEdge(final ApplicationScope applicationScope,
final Entity entity,
final Edge newEdge) {
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+ offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
- final IndexOperationMessage indexMessage = ecm.load( entity.getId() )
- .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) )
- .toBlocking().lastOrDefault(null);
+ }
- queueIndexOperationMessage( indexMessage );
+ private IndexOperationMessage handleEdgeIndex(final QueueMessage message) {
+
+ Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
+ Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
+
+ final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() );
+
+ return ecm.load( edgeIndexEvent.getEntityId() )
+ .flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) )
+ .toBlocking().lastOrDefault(null);
}
@@ -417,7 +468,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
}
- public void handleEdgeDelete(final QueueMessage message) {
+ private IndexOperationMessage handleEdgeDelete(final QueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
@@ -436,10 +487,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
}
- IndexOperationMessage indexMessage =
- eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
-
- queueIndexOperationMessage(indexMessage);
+ return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
}
@@ -478,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
offerTopic( elasticsearchIndexEvent );
}
- public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+ private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {
Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
@@ -529,7 +577,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// always do a check to ensure the indexes are initialized for the index requests
initializeEntityIndexes(indexOperationMessage);
- return indexOperationMessage;
+ // send it to to be indexed
+ indexProducer.put(indexOperationMessage).toBlocking().last();
}
@@ -599,7 +648,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
}
- public void handleEntityDelete(final QueueMessage message) {
+ private IndexOperationMessage handleEntityDelete(final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
@@ -625,14 +674,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
- IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
-
- queueIndexOperationMessage(indexMessage);
+ return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
}
- public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
+ private void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
@@ -793,8 +840,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// collect into a list of QueueMessages that can be ack'd later
.collect(Collectors.toList());
- // sumbit the requests to Elasticsearch
- indexProducer.put(combined).toBlocking().last();
+ queueIndexOperationMessage(combined);
return queueMessages;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 7b2b228..bd581ad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -38,6 +38,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonIgnoreProperties( ignoreUnknown = true )
@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
@JsonSubTypes( {
+ @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
+ @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
@JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
@JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
new file mode 100644
index 0000000..6164dce
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public final class EdgeIndexEvent
+ extends AsyncEvent {
+
+
+ @JsonProperty
+ protected ApplicationScope applicationScope;
+
+ @JsonProperty
+ protected Id entityId;
+
+ @JsonProperty
+ protected Edge edge;
+
+ /**
+ * Needed by jackson
+ */
+ public EdgeIndexEvent() {
+ super();
+ }
+
+ public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) {
+ super(sourceRegion);
+ this.applicationScope = applicationScope;
+ this.entityId = entityId;
+ this.edge = edge;
+ }
+
+
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+
+ public Edge getEdge() {
+ return edge;
+ }
+
+
+ public Id getEntityId() {
+ return entityId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
new file mode 100644
index 0000000..7e8184b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+
+
+public final class EntityIndexEvent extends AsyncEvent {
+
+
+ @JsonProperty
+ protected EntityIdScope entityIdScope;
+
+ @JsonProperty
+ private long updatedAfter;
+
+ public EntityIndexEvent() {
+ super();
+ }
+
+ public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) {
+ super(sourceRegion);
+ this.entityIdScope = entityIdScope;
+ this.updatedAfter = updatedAfter;
+ }
+
+
+ public long getUpdatedAfter() {
+ return updatedAfter;
+ }
+
+
+ public EntityIdScope getEntityIdScope() {
+ return entityIdScope;
+ }
+}