You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/02/25 20:56:52 UTC

[06/15] usergrid git commit: Ensure indexBatch works with the new model for indexing.

Ensure indexBatch works with the new model for indexing.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3c399e79
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3c399e79
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3c399e79

Branch: refs/heads/master
Commit: 3c399e790e16609bdc4a76853fcbc5ad562e8979
Parents: b4634dc
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Feb 19 23:06:30 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Feb 19 23:06:30 2016 -0800

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 68 +++++++++++++------
 .../asyncevents/model/AsyncEvent.java           |  2 -
 .../asyncevents/model/EdgeIndexEvent.java       | 70 --------------------
 .../asyncevents/model/EntityIndexEvent.java     | 54 ---------------
 4 files changed, 49 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index e101761..dac3651 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
@@ -298,12 +297,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
             } catch (ClassCastException cce) {
                 logger.error("Failed to deserialize message body", cce);
-                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
             }
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
             }
 
             final AsyncEvent thisEvent = event;
@@ -312,6 +311,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 logger.debug("Processing {} event", event);
             }
 
+            IndexOperationMessage indexOperationMessage = null;
             try {
 
                 // deletes are 2-part, actual IO to delete data, then queue up a de-index
@@ -332,7 +332,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 // this is the main event that pulls the index doc from map persistence and hands to the index producer
                 else if (event instanceof ElasticsearchIndexEvent) {
 
-                    handleIndexOperation((ElasticsearchIndexEvent) event);
+                    indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);
 
                 } else {
 
@@ -341,20 +341,20 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
 
                 //return type that can be indexed and ack'd later
-                return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime());
+                return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime());
 
             } catch (IndexDocNotFoundException e){
 
                 // this exception is throw when we wait before trying quorum read on map persistence.
                 // return empty event result so the event's message doesn't get ack'd
                 logger.info(e.getMessage());
-                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
 
             } catch (Exception e) {
 
                 // if the event fails to process, log the message and return empty event result so it doesn't get ack'd
                 logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
-                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+                return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
             }
         });
 
@@ -407,6 +407,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public void queueDeleteEdge(final ApplicationScope applicationScope,
                                 final Edge edge) {
 
+        // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
         offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
     }
 
@@ -471,7 +472,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offerTopic( elasticsearchIndexEvent );
     }
 
-    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+    public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
          Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
 
         final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
@@ -525,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
 
         //now execute it
-        indexProducer.put(indexOperationMessage).toBlocking().last();
+        return indexOperationMessage;
 
     }
 
@@ -568,6 +569,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     @Override
     public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
 
+        // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
     }
 
@@ -699,7 +701,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                                  try {
                                                      List<IndexEventResult> indexEventResults = callEventHandlers( messages );
-                                                     List<QueueMessage> messagesToAck = ackMessages( indexEventResults );
+                                                     List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
 
                                                      if ( messagesToAck == null || messagesToAck.size() == 0 ) {
                                                          logger.error(
@@ -738,17 +740,21 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * @param indexEventResults
      * @return
      */
-    private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) {
+    private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
         //if nothing came back then return null
         if(indexEventResults==null){
             return null;
         }
+        IndexOperationMessage combined = new IndexOperationMessage();
 
         // stream the messages to record the cycle time
-        return indexEventResults.stream()
+        List<QueueMessage> queueMessages = indexEventResults.stream()
             .map(indexEventResult -> {
                 //record the cycle time
                 messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+                if(indexEventResult.getIndexOperationMessage().isPresent()){
+                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
+                }
                 return indexEventResult;
             })
             // filter out messages that are not present, they were not processed and put into the results
@@ -756,34 +762,58 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             .map(result -> result.getQueueMessage().get())
             // collect
             .collect(Collectors.toList());
+
+        // sumbit the requests to Elasticsearch
+        indexProducer.put(combined).toBlocking().last();
+
+        return queueMessages;
     }
 
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
-        //change to id scope to avoid serialization issues
-        offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
+
+        EntityIndexOperation entityIndexOperation =
+            new EntityIndexOperation( applicationScope, id, updatedSince);
+
+        queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
 
-        List batch = new ArrayList<EdgeScope>();
+        IndexOperationMessage batch = new IndexOperationMessage();
+
         for ( EdgeScope e : edges){
-            //change to id scope to avoid serialization issues
-            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
+
+            EntityIndexOperation entityIndexOperation =
+                new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);
+
+            IndexOperationMessage indexOperationMessage =
+                eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+
+            if (indexOperationMessage != null){
+                batch.ingest(indexOperationMessage);
+            }
+
         }
-        offerBatch( batch );
+
+        queueIndexOperationMessage(batch);
     }
 
 
     public class IndexEventResult{
+        private final Optional<IndexOperationMessage> indexOperationMessage;
         private final Optional<QueueMessage> queueMessage;
         private final long creationTime;
 
-        public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){
+        public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){
 
             this.queueMessage = queueMessage;
             this.creationTime = creationTime;
+            this.indexOperationMessage = indexOperationMessage;
         }
 
+        public Optional<IndexOperationMessage> getIndexOperationMessage() {
+            return indexOperationMessage;
+        }
 
         public Optional<QueueMessage> getQueueMessage() {
             return queueMessage;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 5f86410..57b5812 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -40,9 +40,7 @@ import org.apache.usergrid.persistence.queue.QueueFig;
 @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
 @JsonSubTypes( {
     @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
-    @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
     @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
-    @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
     @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
     @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
 } )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
deleted file mode 100644
index 6164dce..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public final class EdgeIndexEvent
-    extends AsyncEvent {
-
-
-    @JsonProperty
-    protected ApplicationScope applicationScope;
-
-    @JsonProperty
-    protected Id entityId;
-
-    @JsonProperty
-    protected Edge edge;
-
-    /**
-     * Needed by jackson
-     */
-    public EdgeIndexEvent() {
-        super();
-    }
-
-    public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) {
-        super(sourceRegion);
-        this.applicationScope = applicationScope;
-        this.entityId = entityId;
-        this.edge = edge;
-    }
-
-
-    public ApplicationScope getApplicationScope() {
-        return applicationScope;
-    }
-
-
-    public Edge getEdge() {
-        return edge;
-    }
-
-
-    public Id getEntityId() {
-        return entityId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
deleted file mode 100644
index 7e8184b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-
-
-public final class EntityIndexEvent extends AsyncEvent {
-
-
-    @JsonProperty
-    protected EntityIdScope entityIdScope;
-
-    @JsonProperty
-    private long updatedAfter;
-
-    public EntityIndexEvent() {
-        super();
-    }
-
-    public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) {
-        super(sourceRegion);
-        this.entityIdScope = entityIdScope;
-        this.updatedAfter = updatedAfter;
-    }
-
-
-    public long getUpdatedAfter() {
-        return updatedAfter;
-    }
-
-
-    public EntityIdScope getEntityIdScope() {
-        return entityIdScope;
-    }
-}