You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/02/25 20:56:34 UTC
[05/10] usergrid git commit: Ensure indexBatch works with the new
model for indexing.
Ensure indexBatch works with the new model for indexing.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3c399e79
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3c399e79
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3c399e79
Branch: refs/heads/release-2.1.1
Commit: 3c399e790e16609bdc4a76853fcbc5ad562e8979
Parents: b4634dc
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Feb 19 23:06:30 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Feb 19 23:06:30 2016 -0800
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 68 +++++++++++++------
.../asyncevents/model/AsyncEvent.java | 2 -
.../asyncevents/model/EdgeIndexEvent.java | 70 --------------------
.../asyncevents/model/EntityIndexEvent.java | 54 ---------------
4 files changed, 49 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index e101761..dac3651 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
@@ -298,12 +297,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
} catch (ClassCastException cce) {
logger.error("Failed to deserialize message body", cce);
- return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+ return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}
if (event == null) {
logger.error("AsyncEvent type or event is null!");
- return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+ return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}
final AsyncEvent thisEvent = event;
@@ -312,6 +311,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
logger.debug("Processing {} event", event);
}
+ IndexOperationMessage indexOperationMessage = null;
try {
// deletes are 2-part, actual IO to delete data, then queue up a de-index
@@ -332,7 +332,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// this is the main event that pulls the index doc from map persistence and hands to the index producer
else if (event instanceof ElasticsearchIndexEvent) {
- handleIndexOperation((ElasticsearchIndexEvent) event);
+ indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);
} else {
@@ -341,20 +341,20 @@ public class AsyncEventServiceImpl implements AsyncEventService {
//return type that can be indexed and ack'd later
- return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime());
+ return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime());
} catch (IndexDocNotFoundException e){
// this exception is throw when we wait before trying quorum read on map persistence.
// return empty event result so the event's message doesn't get ack'd
logger.info(e.getMessage());
- return new IndexEventResult(Optional.absent(), event.getCreationTime());
+ return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
} catch (Exception e) {
// if the event fails to process, log the message and return empty event result so it doesn't get ack'd
logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
- return new IndexEventResult(Optional.absent(), event.getCreationTime());
+ return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
}
});
@@ -407,6 +407,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public void queueDeleteEdge(final ApplicationScope applicationScope,
final Edge edge) {
+ // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
}
@@ -471,7 +472,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
offerTopic( elasticsearchIndexEvent );
}
- public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+ public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
@@ -525,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
//now execute it
- indexProducer.put(indexOperationMessage).toBlocking().last();
+ return indexOperationMessage;
}
@@ -568,6 +569,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+ // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
}
@@ -699,7 +701,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
try {
List<IndexEventResult> indexEventResults = callEventHandlers( messages );
- List<QueueMessage> messagesToAck = ackMessages( indexEventResults );
+ List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
if ( messagesToAck == null || messagesToAck.size() == 0 ) {
logger.error(
@@ -738,17 +740,21 @@ public class AsyncEventServiceImpl implements AsyncEventService {
* @param indexEventResults
* @return
*/
- private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) {
+ private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
//if nothing came back then return null
if(indexEventResults==null){
return null;
}
+ IndexOperationMessage combined = new IndexOperationMessage();
// stream the messages to record the cycle time
- return indexEventResults.stream()
+ List<QueueMessage> queueMessages = indexEventResults.stream()
.map(indexEventResult -> {
//record the cycle time
messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+ if(indexEventResult.getIndexOperationMessage().isPresent()){
+ combined.ingest(indexEventResult.getIndexOperationMessage().get());
+ }
return indexEventResult;
})
// filter out messages that are not present, they were not processed and put into the results
@@ -756,34 +762,58 @@ public class AsyncEventServiceImpl implements AsyncEventService {
.map(result -> result.getQueueMessage().get())
// collect
.collect(Collectors.toList());
+
+ // sumbit the requests to Elasticsearch
+ indexProducer.put(combined).toBlocking().last();
+
+ return queueMessages;
}
public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
- //change to id scope to avoid serialization issues
- offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
+
+ EntityIndexOperation entityIndexOperation =
+ new EntityIndexOperation( applicationScope, id, updatedSince);
+
+ queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
}
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
- List batch = new ArrayList<EdgeScope>();
+ IndexOperationMessage batch = new IndexOperationMessage();
+
for ( EdgeScope e : edges){
- //change to id scope to avoid serialization issues
- batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
+
+ EntityIndexOperation entityIndexOperation =
+ new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);
+
+ IndexOperationMessage indexOperationMessage =
+ eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+
+ if (indexOperationMessage != null){
+ batch.ingest(indexOperationMessage);
+ }
+
}
- offerBatch( batch );
+
+ queueIndexOperationMessage(batch);
}
public class IndexEventResult{
+ private final Optional<IndexOperationMessage> indexOperationMessage;
private final Optional<QueueMessage> queueMessage;
private final long creationTime;
- public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){
+ public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){
this.queueMessage = queueMessage;
this.creationTime = creationTime;
+ this.indexOperationMessage = indexOperationMessage;
}
+ public Optional<IndexOperationMessage> getIndexOperationMessage() {
+ return indexOperationMessage;
+ }
public Optional<QueueMessage> getQueueMessage() {
return queueMessage;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 5f86410..57b5812 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -40,9 +40,7 @@ import org.apache.usergrid.persistence.queue.QueueFig;
@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
@JsonSubTypes( {
@JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
- @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
- @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
@JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
@JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
} )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
deleted file mode 100644
index 6164dce..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public final class EdgeIndexEvent
- extends AsyncEvent {
-
-
- @JsonProperty
- protected ApplicationScope applicationScope;
-
- @JsonProperty
- protected Id entityId;
-
- @JsonProperty
- protected Edge edge;
-
- /**
- * Needed by jackson
- */
- public EdgeIndexEvent() {
- super();
- }
-
- public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) {
- super(sourceRegion);
- this.applicationScope = applicationScope;
- this.entityId = entityId;
- this.edge = edge;
- }
-
-
- public ApplicationScope getApplicationScope() {
- return applicationScope;
- }
-
-
- public Edge getEdge() {
- return edge;
- }
-
-
- public Id getEntityId() {
- return entityId;
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3c399e79/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
deleted file mode 100644
index 7e8184b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-
-
-public final class EntityIndexEvent extends AsyncEvent {
-
-
- @JsonProperty
- protected EntityIdScope entityIdScope;
-
- @JsonProperty
- private long updatedAfter;
-
- public EntityIndexEvent() {
- super();
- }
-
- public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) {
- super(sourceRegion);
- this.entityIdScope = entityIdScope;
- this.updatedAfter = updatedAfter;
- }
-
-
- public long getUpdatedAfter() {
- return updatedAfter;
- }
-
-
- public EntityIdScope getEntityIdScope() {
- return entityIdScope;
- }
-}