You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:36 UTC
[08/50] incubator-usergrid git commit: Refactored index messages to
be serializable.
Refactored index messages to be serializable.
Consumers now rolls up index requests and flushes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23919745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23919745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23919745
Branch: refs/heads/two-dot-o
Commit: 23919745883ff9d60c18ce75fc1a056a017bce37
Parents: cd0015d
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 16:12:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 16:12:34 2015 -0600
----------------------------------------------------------------------
stack/corepersistence/queryindex/pom.xml | 8 +
.../usergrid/persistence/index/IndexFig.java | 7 +
.../index/IndexOperationMessage.java | 29 +++-
.../persistence/index/guice/IndexModule.java | 2 +
.../persistence/index/impl/BatchRequest.java | 41 +++++
.../index/impl/BufferQueueInMemory.java | 87 ----------
.../index/impl/BufferQueueInMemoryImpl.java | 87 ++++++++++
.../index/impl/BufferQueueSQSImpl.java | 158 +++++++++++++++++++
.../persistence/index/impl/DeIndexRequest.java | 106 +++++++++++++
.../index/impl/EsEntityIndexBatchImpl.java | 39 ++---
.../index/impl/EsEntityIndexImpl.java | 2 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 37 ++---
.../persistence/index/impl/IndexRequest.java | 117 ++++++++++++++
.../index/guice/TestIndexModule.java | 6 +-
.../persistence/queue/QueueManager.java | 4 +-
15 files changed, 581 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index f6ae718..a5fbf6a 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -108,6 +108,14 @@
</dependency>
<dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>queue</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ </dependency>
+
+
+ <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index befbaa9..ce14449 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -55,6 +55,8 @@ public interface IndexFig extends GuicyFig {
public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
+ public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+
public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -167,4 +169,9 @@ public interface IndexFig extends GuicyFig {
@Default("1000")
@Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
long getFailureRetryTime();
+
+ //give us 60 seconds to process the message
+ @Default("60")
+ @Key(INDEX_QUEUE_READ_TIMEOUT)
+ int getIndexQueueTimeout();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 944a71f..43eaa01 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -17,23 +17,28 @@
package org.apache.usergrid.persistence.index;
import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.BatchRequest;
+
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import java.io.Serializable;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Container for index operations.
*/
-public class IndexOperationMessage {
- private final ConcurrentLinkedQueue<ActionRequestBuilder> builders;
+public class IndexOperationMessage implements Serializable {
+ private final Set<BatchRequest> builders;
private final BetterFuture<IndexOperationMessage> containerFuture;
public IndexOperationMessage(){
final IndexOperationMessage parent = this;
- builders = new ConcurrentLinkedQueue<>();
+ this.builders = new HashSet<>();
this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
@Override
public IndexOperationMessage call() throws Exception {
@@ -42,7 +47,21 @@ public class IndexOperationMessage {
});
}
- public void addOperation(ActionRequestBuilder builder){
+
+ /**
+ * Add all our operations in the set
+ * @param requests
+ */
+ public void setOperations(final Set<BatchRequest> requests){
+ this.builders.addAll( requests);
+ }
+
+
+ /**
+ * Add the operation to the set
+ * @param builder
+ */
+ public void addOperation(BatchRequest builder){
builders.add(builder);
}
@@ -50,7 +69,7 @@ public class IndexOperationMessage {
* return operations for the message
* @return
*/
- public ConcurrentLinkedQueue<ActionRequestBuilder> getOperations(){
+ public Set<BatchRequest> getOperations(){
return builders;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index d911dab..b03e1c0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -41,6 +42,7 @@ public abstract class IndexModule extends AbstractModule {
install(new GuicyFigModule(IndexFig.class));
install(new MapModule());
+ install(new QueueModule());
bind(EntityIndexFactory.class).to( EsEntityIndexFactoryImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
new file mode 100644
index 0000000..df6c5b0
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.Serializable;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * A batch request we can serialize and construct on receive
+ */
+public interface BatchRequest extends Serializable {
+
+
+ /**
+ * Passing the client and the bulk request, add ourselves to the bulk request
+ * @param client
+ * @param bulkRequest
+ */
+ public void doOperation(final Client client, final BulkRequestBuilder bulkRequest );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
deleted file mode 100644
index 403762f..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class BufferQueueInMemory implements BufferQueue {
-
- private final ArrayBlockingQueue<IndexOperationMessage> messages;
-
-
- @Inject
- public BufferQueueInMemory(final IndexFig fig ) {
- messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
- }
-
-
- @Override
- public void offer( final IndexOperationMessage operation ) {
- messages.offer( operation );
- }
-
-
- @Override
- public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
-
- final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
-
- final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
-
- //loop until we're we're full or we time out
- do {
- try {
- //we received 1, try to drain
- IndexOperationMessage polled = messages.poll( timeout, timeUnit );
-
- //drain
- if ( polled != null ) {
- response.add( polled );
- messages.drainTo( response, takeSize - response.size() );
- }
- }
- catch ( InterruptedException ie ) {
- //swallow
-
- }
- }
- while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
-
- return response;
- }
-
-
- @Override
- public void ack( final List<IndexOperationMessage> messages ) {
- //no op for this
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
new file mode 100644
index 0000000..ef0ef5f
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemoryImpl implements BufferQueue {
+
+ private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
+
+ @Inject
+ public BufferQueueInMemoryImpl( final IndexFig fig ) {
+ messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
+ }
+
+
+ @Override
+ public void offer( final IndexOperationMessage operation ) {
+ messages.offer( operation );
+ }
+
+
+ @Override
+ public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+ final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+
+ final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+ //loop until we're we're full or we time out
+ do {
+ try {
+ //we received 1, try to drain
+ IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+ //drain
+ if ( polled != null ) {
+ response.add( polled );
+ messages.drainTo( response, takeSize - response.size() );
+ }
+ }
+ catch ( InterruptedException ie ) {
+ //swallow
+
+ }
+ }
+ while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+ return response;
+ }
+
+
+ @Override
+ public void ack( final List<IndexOperationMessage> messages ) {
+ //no op for this
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
new file mode 100644
index 0000000..b814603
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.elasticsearch.action.ActionRequestBuilder;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueSQSImpl implements BufferQueue {
+
+ /** Hacky, copied from CPEntityManager b/c we can't access it here */
+ public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+
+
+ private static final String QUEUE_NAME = "es_queue";
+
+ private final QueueManager queue;
+ private final IndexFig indexFig;
+
+
+ @Inject
+ public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
+ final QueueScope scope =
+ new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
+
+ this.queue = queueManagerFactory.getQueueManager( scope );
+ this.indexFig = indexFig;
+ }
+
+
+ @Override
+ public void offer( final IndexOperationMessage operation ) {
+ final Message toQueue = new Message( operation.getOperations() );
+
+
+
+
+ try {
+ this.queue.sendMessage( toQueue );
+ operation.getFuture().run();
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to queue message", e );
+ }
+ }
+
+
+ @Override
+ public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+ //loop until we're we're full or we time out
+ List<QueueMessage> messages = queue
+ .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+ Message.class );
+
+
+ final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
+
+ for ( final QueueMessage message : messages ) {
+
+ SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+
+ operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+
+ response.add( operation );
+ }
+
+ return response;
+ }
+
+
+ @Override
+ public void ack( final List<IndexOperationMessage> messages ) {
+
+ List<QueueMessage> toAck = new ArrayList<>( messages.size() );
+
+ for(IndexOperationMessage ioe: messages){
+ toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+ }
+
+ queue.commitMessages( toAck );
+ }
+
+
+ /**
+ * The message to queue to SQS
+ */
+ public static final class Message implements Serializable {
+ private final Set<BatchRequest> data;
+
+
+ private Message( final Set<BatchRequest> data ) {this.data = data;}
+
+
+ public Set<BatchRequest> getData() {
+ return data;
+ }
+ }
+
+
+ /**
+ * The message that subclasses our IndexOperationMessage. holds a pointer to the original message
+ */
+ public class SqsIndexOperationMessage extends IndexOperationMessage {
+
+ private final QueueMessage message;
+
+
+ public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+
+
+ /**
+ * Get the message from our queue
+ */
+ public QueueMessage getMessage() {
+ return message;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
new file mode 100644
index 0000000..a279f16
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Arrays;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class DeIndexRequest implements BatchRequest {
+
+ public final String[] indexes;
+ public final String entityType;
+ public final String documentId;
+
+
+ public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
+ this.indexes = indexes;
+ this.entityType = entityType;
+ this.documentId = documentId;
+ }
+
+
+ @Override
+ public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+
+
+ for(final String index: indexes) {
+ final DeleteRequestBuilder builder = client.prepareDelete( index, entityType, documentId);
+
+ bulkRequest.add( builder );
+ }
+ }
+
+
+ public String[] getIndexes() {
+ return indexes;
+ }
+
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final DeIndexRequest that = ( DeIndexRequest ) o;
+
+ if ( !documentId.equals( that.documentId ) ) {
+ return false;
+ }
+ if ( !entityType.equals( that.entityType ) ) {
+ return false;
+ }
+ if ( !Arrays.equals( indexes, that.indexes ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = Arrays.hashCode( indexes );
+ result = 31 * result + entityType.hashCode();
+ result = 31 * result + documentId.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index d987b29..b0c731e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -73,10 +73,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final ApplicationScope applicationScope;
- private final Client client;
-
- private final boolean refresh;
-
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
@@ -85,21 +81,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final AliasedEntityIndex entityIndex;
private IndexOperationMessage container;
- private final Timer batchTimer;
- public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,
+ public EsEntityIndexBatchImpl(final ApplicationScope applicationScope,
final IndexBufferProducer indexBatchBufferProducer,final IndexFig config,
- final AliasedEntityIndex entityIndex,final MetricsFactory metricsFactory ) {
+ final AliasedEntityIndex entityIndex ) {
this.applicationScope = applicationScope;
- this.client = client;
this.indexBatchBufferProducer = indexBatchBufferProducer;
this.entityIndex = entityIndex;
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
- this.refresh = config.isForcedRefresh();
- this.batchTimer = metricsFactory.getTimer( EsEntityIndexBatchImpl.class, "entity.index.batch.timer" );
//constrained
this.container = new IndexOperationMessage();
}
@@ -133,9 +125,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
final String entityType = entity.getId().getType();
- IndexRequestBuilder builder =
- client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
- container.addOperation(builder);
+
+
+ container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+
return this;
}
@@ -174,23 +167,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
if(indexes == null ||indexes.length == 0){
indexes = new String[]{indexIdentifier.getIndex(null)};
}
- //get all indexes then flush everyone
- Timer.Context timeDeindex = batchTimer.time();
- Observable.from(indexes)
- .map(new Func1<String, Object>() {
- @Override
- public Object call(String index) {
- try {
- DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
- container.addOperation(builder);
- }catch (Exception e){
- log.error("failed to deindex",e);
- throw e;
- }
- return index;
- }
- }).toBlocking().last();
- timeDeindex.stop();
+
+ container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+
log.debug("Deindexed Entity with index id " + indexId);
return this;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a1a8ca7..7bdb41a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -400,7 +400,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
EntityIndexBatch batch = new EsEntityIndexBatchImpl(
- applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this, metricsFactory );
+ applicationScope, indexBatchBufferProducer, config, this );
return batch;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 45c12a1..2342398 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -26,7 +26,6 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBufferConsumer;
-import org.apache.usergrid.persistence.index.IndexBufferProducer;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.elasticsearch.action.ActionRequestBuilder;
@@ -37,7 +36,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
@@ -48,7 +46,6 @@ import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -88,6 +85,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
@Override
public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+
+ //name our thread so it's easy to see
+ Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+
List<IndexOperationMessage> drainList;
do {
try {
@@ -130,7 +131,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
while ( true );
}
- } ).subscribeOn( Schedulers.io() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
+ } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
@Override
public void call( List<IndexOperationMessage> containerList ) {
@@ -157,37 +158,29 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
//process and flatten all the messages to builder requests
- Observable<ActionRequestBuilder> flattenMessages = Observable.from(operationMessages)
+ Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
.subscribeOn(Schedulers.io())
- .flatMap(new Func1<IndexOperationMessage, Observable<ActionRequestBuilder>>() {
+ .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
@Override
- public Observable<ActionRequestBuilder> call(IndexOperationMessage operationMessage) {
- return Observable.from(operationMessage.getOperations());
+ public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
+ return Observable.from( operationMessage.getOperations() );
}
- });
+ } );
//batch shard operations into a bulk request
flattenMessages
.buffer(config.getIndexBatchSize())
- .doOnNext(new Action1<List<ActionRequestBuilder>>() {
+ .doOnNext(new Action1<List<BatchRequest>>() {
@Override
- public void call(List<ActionRequestBuilder> builders) {
+ public void call(List<BatchRequest> builders) {
try {
final BulkRequestBuilder bulkRequest = initRequest();
- for (ActionRequestBuilder builder : builders) {
+ for (BatchRequest builder : builders) {
indexSizeCounter.dec();
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
- }
- if(builder instanceof DeleteByQueryRequestBuilder){
- DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = (DeleteByQueryRequestBuilder) builder;
- deleteByQueryRequestBuilder.get();
- }
+
+ builder.doOperation( client, bulkRequest );
}
sendRequest(bulkRequest);
}catch (Exception e){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
new file mode 100644
index 0000000..381d005
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Map;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class IndexRequest implements BatchRequest {
+
+ public final String writeAlias;
+ public final String entityType;
+ public final String documentId;
+
+ public final Map<String, Object> data;
+
+
+ public IndexRequest( final String writeAlias, final String entityType, final String documentId,
+ final Map<String, Object> data ) {
+ this.writeAlias = writeAlias;
+ this.entityType = entityType;
+ this.documentId = documentId;
+ this.data = data;
+ }
+
+
+ public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+ IndexRequestBuilder builder =
+ client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+
+
+ bulkRequest.add( builder );
+
+ }
+
+
+ public String getWriteAlias() {
+ return writeAlias;
+ }
+
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+
+ public Map<String, Object> getData() {
+ return data;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final IndexRequest that = ( IndexRequest ) o;
+
+ if ( !data.equals( that.data ) ) {
+ return false;
+ }
+ if ( !documentId.equals( that.documentId ) ) {
+ return false;
+ }
+ if ( !entityType.equals( that.entityType ) ) {
+ return false;
+ }
+ if ( !writeAlias.equals( that.writeAlias ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = writeAlias.hashCode();
+ result = 31 * result + entityType.hashCode();
+ result = 31 * result + documentId.hashCode();
+ result = 31 * result + data.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7e2312d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -23,7 +23,8 @@ import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.TestModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.index.impl.BufferQueue;
-import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
public class TestIndexModule extends TestModule {
@@ -37,7 +38,8 @@ public class TestIndexModule extends TestModule {
install( new IndexModule() {
@Override
public void wireBufferQueue() {
- bind( BufferQueue.class).to( BufferQueueInMemory.class );
+ bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+// bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 223860e..dd044d2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -29,8 +29,8 @@ public interface QueueManager {
/**
* Read messages from queue
* @param limit
- * @param transactionTimeout timeout in ms
- * @param waitTime wait time for next message in ms
+ * @param transactionTimeout timeout in seconds
+ * @param waitTime wait time for next message in milliseconds
* @param klass class to cast the return from
* @return List of Queue Messages
*/