You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/04/13 22:42:41 UTC

usergrid git commit: Make the intial write path of indexing require less I/O.

Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 a46224419 -> 10ff27cc8


Make the intial write path of indexing require less I/O.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10ff27cc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10ff27cc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10ff27cc

Branch: refs/heads/release-2.1.1
Commit: 10ff27cc86923a24c52e881f821870867dd95212
Parents: a462244
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 13 22:42:12 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 13 22:42:12 2016 +0200

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 102 ++++++++++++++-----
 .../asyncevents/model/AsyncEvent.java           |   2 +
 .../asyncevents/model/EdgeIndexEvent.java       |  70 +++++++++++++
 .../asyncevents/model/EntityIndexEvent.java     |  54 ++++++++++
 4 files changed, 200 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 82ad5be..3b01292 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -310,20 +310,33 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 logger.debug("Processing event with type {}", event.getClass().getSimpleName());
             }
 
-            IndexOperationMessage indexOperationMessage = null;
             try {
 
+                IndexOperationMessage single = new IndexOperationMessage();
+
+                // normal indexing event for an entity
+                if ( event instanceof  EntityIndexEvent ){
+
+                     single = handleEntityIndexUpdate( message );
+
+                }
+                // normal indexing event for an edge
+                else if ( event instanceof EdgeIndexEvent ){
+
+                    single = handleEdgeIndex( message );
+
+                }
                 // deletes are 2-part, actual IO to delete data, then queue up a de-index
-                if ( event instanceof EdgeDeleteEvent ) {
+                else if ( event instanceof EdgeDeleteEvent ) {
 
-                    handleEdgeDelete( message );
+                    single = handleEdgeDelete( message );
                 }
                 // deletes are 2-part, actual IO to delete data, then queue up a de-index
                 else if ( event instanceof EntityDeleteEvent ) {
 
-                    handleEntityDelete( message );
+                    single = handleEntityDelete( message );
                 }
-                // application initialization has special logic, therefore a special event type
+                // initialization has special logic, therefore a special event type and no index operation message
                 else if ( event instanceof InitializeApplicationIndexEvent ) {
 
                     handleInitializeApplicationIndex(event, message);
@@ -331,11 +344,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 // this is the main event that pulls the index doc from map persistence and hands to the index producer
                 else if (event instanceof ElasticsearchIndexEvent) {
 
-                    indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);
+                    handleIndexOperation((ElasticsearchIndexEvent) event);
 
                 } else if (event instanceof DeIndexOldVersionsEvent) {
 
-                    indexOperationMessage = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
+                    single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
 
                 } else {
 
@@ -343,9 +356,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 }
 
 
-                // returning indexOperationMessage will send that indexOperationMessage to the index producer
                 // if no exception happens and the QueueMessage is returned in these results, it will get ack'd
-                return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime());
+                return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());
 
             } catch (IndexDocNotFoundException e){
 
@@ -382,6 +394,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                        final Entity entity, long updatedAfter) {
 
 
+        offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0));
+
         final EntityIndexOperation entityIndexOperation =
             new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter);
 
@@ -392,19 +406,56 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     }
 
+    private IndexOperationMessage handleEntityIndexUpdate(final QueueMessage message) {
+
+        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
+
+        final AsyncEvent event = ( AsyncEvent ) message.getBody();
+
+        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
+        Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
+
+        final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
+
+
+        //process the entity immediately
+        //only process the same version, otherwise ignore
+        final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
+        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+        final Id entityId = entityIdScope.getId();
+        final long updatedAfter = entityIndexEvent.getUpdatedAfter();
+
+        final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
+
+        return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+    }
+
 
     @Override
     public void queueNewEdge(final ApplicationScope applicationScope,
                              final Entity entity,
                              final Edge newEdge) {
 
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+        offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
 
-        final IndexOperationMessage indexMessage = ecm.load( entity.getId() )
-            .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) )
-            .toBlocking().lastOrDefault(null);
+    }
 
-        queueIndexOperationMessage( indexMessage );
+    private IndexOperationMessage handleEdgeIndex(final QueueMessage message) {
+
+        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
+        Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
+
+        final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() );
+
+        return ecm.load( edgeIndexEvent.getEntityId() )
+            .flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) )
+            .toBlocking().lastOrDefault(null);
 
     }
 
@@ -417,7 +468,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
     }
 
-    public void handleEdgeDelete(final QueueMessage message) {
+    private IndexOperationMessage  handleEdgeDelete(final QueueMessage message) {
 
         Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
 
@@ -436,10 +487,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
         }
 
-        IndexOperationMessage indexMessage =
-            eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
-
-        queueIndexOperationMessage(indexMessage);
+        return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
 
     }
 
@@ -478,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offerTopic( elasticsearchIndexEvent );
     }
 
-    public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+    private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {
 
         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
 
@@ -529,7 +577,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         // always do a check to ensure the indexes are initialized for the index requests
         initializeEntityIndexes(indexOperationMessage);
 
-        return indexOperationMessage;
+        // send it to to be indexed
+        indexProducer.put(indexOperationMessage).toBlocking().last();
 
     }
 
@@ -599,7 +648,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
     }
 
-    public void handleEntityDelete(final QueueMessage message) {
+    private IndexOperationMessage handleEntityDelete(final QueueMessage message) {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
 
@@ -625,14 +674,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
 
-        IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
-
-        queueIndexOperationMessage(indexMessage);
+        return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
 
     }
 
 
-    public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
+    private void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
         Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
 
@@ -793,8 +840,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-        // sumbit the requests to Elasticsearch
-        indexProducer.put(combined).toBlocking().last();
+       queueIndexOperationMessage(combined);
 
         return queueMessages;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 7b2b228..bd581ad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -38,6 +38,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 @JsonIgnoreProperties( ignoreUnknown = true )
 @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
 @JsonSubTypes( {
+    @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
+    @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
     @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
     @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
     @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),

http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
new file mode 100644
index 0000000..6164dce
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public final class EdgeIndexEvent
+    extends AsyncEvent {
+
+
+    @JsonProperty
+    protected ApplicationScope applicationScope;
+
+    @JsonProperty
+    protected Id entityId;
+
+    @JsonProperty
+    protected Edge edge;
+
+    /**
+     * Needed by jackson
+     */
+    public EdgeIndexEvent() {
+        super();
+    }
+
+    public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) {
+        super(sourceRegion);
+        this.applicationScope = applicationScope;
+        this.entityId = entityId;
+        this.edge = edge;
+    }
+
+
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+
+    public Edge getEdge() {
+        return edge;
+    }
+
+
+    public Id getEntityId() {
+        return entityId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ff27cc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
new file mode 100644
index 0000000..7e8184b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+
+
+public final class EntityIndexEvent extends AsyncEvent {
+
+
+    @JsonProperty
+    protected EntityIdScope entityIdScope;
+
+    @JsonProperty
+    private long updatedAfter;
+
+    public EntityIndexEvent() {
+        super();
+    }
+
+    public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) {
+        super(sourceRegion);
+        this.entityIdScope = entityIdScope;
+        this.updatedAfter = updatedAfter;
+    }
+
+
+    public long getUpdatedAfter() {
+        return updatedAfter;
+    }
+
+
+    public EntityIdScope getEntityIdScope() {
+        return entityIdScope;
+    }
+}