You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:38 UTC
[10/50] incubator-usergrid git commit: Changed consumer structure of
buffer/wait timeout. This how happens implicitly in our queue take,
and is no longer necessary. .
Changed consumer structure of buffer/wait timeout. This how happens implicitly in our queue take, and is no longer necessary.
.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9111d944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9111d944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9111d944
Branch: refs/heads/two-dot-o
Commit: 9111d94481bc15490b477f9ca48dd2565ca0e9dd
Parents: c47f32a
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 17:25:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 17:25:23 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 9 ++-
.../usergrid/persistence/index/IndexFig.java | 47 ++++++-----
.../index/IndexOperationMessage.java | 83 ++++++++++++--------
.../index/impl/BufferQueueInMemoryImpl.java | 5 +-
.../index/impl/BufferQueueSQSImpl.java | 54 +++++--------
.../persistence/index/impl/DeIndexRequest.java | 5 ++
.../index/impl/EsEntityIndexBatchImpl.java | 12 ++-
.../index/impl/EsIndexBufferConsumerImpl.java | 39 +++++----
.../index/impl/EsIndexBufferProducerImpl.java | 3 +-
.../persistence/index/impl/IndexRequest.java | 24 ++++--
.../impl/EntityConnectionIndexImplTest.java | 4 +-
.../queue/impl/SQSQueueManagerImpl.java | 8 +-
12 files changed, 174 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 8d99586..7b53d67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
import org.apache.usergrid.persistence.index.guice.IndexModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
import org.apache.usergrid.persistence.map.guice.MapModule;
import org.apache.usergrid.persistence.queue.guice.QueueModule;
import org.slf4j.Logger;
@@ -69,7 +71,12 @@ public class CoreModule extends AbstractModule {
install( new CommonModule());
install(new CollectionModule());
install(new GraphModule());
- install(new IndexModule());
+ install( new IndexModule() {
+ @Override
+ public void wireBufferQueue() {
+ bind(BufferQueue.class).to( BufferQueueSQSImpl.class );
+ }
+ } );
install(new MapModule());
install(new QueueModule());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index ce14449..cde86fd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -18,6 +18,7 @@
*/
package org.apache.usergrid.persistence.index;
+
import org.safehaus.guicyfig.Default;
import org.safehaus.guicyfig.FigSingleton;
import org.safehaus.guicyfig.GuicyFig;
@@ -55,8 +56,16 @@ public interface IndexFig extends GuicyFig {
public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
+ /**
+ * Amount of time to wait when reading from the queue
+ */
public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+ /**
+ * Amount of time to wait when reading from the queue in milliseconds
+ */
+ public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout";
+
public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -67,9 +76,10 @@ public interface IndexFig extends GuicyFig {
public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
/**
- * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple backpressure
+ * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple
+ * backpressure
*/
- public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+ public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@@ -82,7 +92,7 @@ public interface IndexFig extends GuicyFig {
int getPort();
@Default( "usergrid" )
- @Key( ELASTICSEARCH_CLUSTER_NAME)
+ @Key( ELASTICSEARCH_CLUSTER_NAME )
String getClusterName();
@Default( "usergrid" ) // no underbars allowed
@@ -111,15 +121,15 @@ public interface IndexFig extends GuicyFig {
public boolean isForcedRefresh();
/** Identify the client node with a unique name. */
- @Default("default")
+ @Default( "default" )
@Key( ELASTICSEARCH_NODENAME )
public String getNodeName();
- @Default("6")
+ @Default( "6" )
@Key( ELASTICSEARCH_NUMBER_OF_SHARDS )
public int getNumberOfShards();
- @Default("1")
+ @Default( "1" )
@Key( ELASTICSEARCH_NUMBER_OF_REPLICAS )
public int getNumberOfReplicas();
@@ -127,51 +137,48 @@ public interface IndexFig extends GuicyFig {
@Key( ELASTICSEARCH_FAIL_REFRESH )
int getFailRefreshCount();
- @Default("2")
+ @Default( "2" )
int getIndexCacheMaxWorkers();
/**
* how long to wait before the buffer flushes to send
- * @return
*/
- @Default("250")
+ @Default( "250" )
@Key( INDEX_BUFFER_TIMEOUT )
long getIndexBufferTimeout();
/**
* size of the buffer to build up before you send results
- * @return
*/
- @Default("1000")
+ @Default( "1000" )
@Key( INDEX_BUFFER_SIZE )
int getIndexBufferSize();
/**
* size of the buffer to build up before you send results
- * @return
*/
- @Default("1000")
+ @Default( "1000" )
@Key( INDEX_QUEUE_SIZE )
int getIndexQueueSize();
/**
* Request batch size for ES
- * @return
*/
- @Default("1000")
- @Key( INDEX_BATCH_SIZE)
+ @Default( "1000" )
+ @Key( INDEX_BATCH_SIZE )
int getIndexBatchSize();
- @Default("one")
+ @Default( "one" )
@Key( INDEX_WRITE_CONSISTENCY_LEVEL )
String getWriteConsistencyLevel();
- @Default("1000")
+ @Default( "1000" )
@Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
long getFailureRetryTime();
//give us 60 seconds to process the message
- @Default("60")
- @Key(INDEX_QUEUE_READ_TIMEOUT)
+ @Default( "60" )
+ @Key( INDEX_QUEUE_READ_TIMEOUT )
int getIndexQueueTimeout();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 43eaa01..7d8a859 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -16,69 +16,84 @@
*/
package org.apache.usergrid.persistence.index;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.index.impl.BatchRequest;
-
-import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import java.io.Serializable;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.DeIndexRequest;
+import org.apache.usergrid.persistence.index.impl.IndexRequest;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
/**
* Container for index operations.
*/
-public class IndexOperationMessage implements Serializable {
- private final Set<BatchRequest> builders;
+public class IndexOperationMessage implements Serializable {
+ private final Set<IndexRequest> indexRequests;
+ private final Set<DeIndexRequest> deIndexRequests;
+
+
+
private final BetterFuture<IndexOperationMessage> containerFuture;
- public IndexOperationMessage(){
+
+ public IndexOperationMessage() {
final IndexOperationMessage parent = this;
- this.builders = new HashSet<>();
- this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
+ this.indexRequests = new HashSet<>();
+ this.deIndexRequests = new HashSet<>();
+ this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() {
@Override
public IndexOperationMessage call() throws Exception {
return parent;
}
- });
+ } );
}
- /**
- * Add all our operations in the set
- * @param requests
- */
- public void setOperations(final Set<BatchRequest> requests){
- this.builders.addAll( requests);
+ public void addIndexRequest( final IndexRequest indexRequest ) {
+ indexRequests.add( indexRequest );
}
- /**
- * Add the operation to the set
- * @param builder
- */
- public void addOperation(BatchRequest builder){
- builders.add(builder);
+ public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
+ indexRequests.addAll( indexRequests );
}
- /**
- * return operations for the message
- * @return
- */
- public Set<BatchRequest> getOperations(){
- return builders;
+
+ public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
+ deIndexRequests.add( deIndexRequest );
+ }
+
+
+ public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
+ deIndexRequests.addAll( deIndexRequests );
+ }
+
+
+ public Set<IndexRequest> getIndexRequests() {
+ return indexRequests;
+ }
+
+
+ public Set<DeIndexRequest> getDeIndexRequests() {
+ return deIndexRequests;
+ }
+
+
+ @JsonIgnore
+ public boolean isEmpty(){
+ return indexRequests.isEmpty() && deIndexRequests.isEmpty();
}
/**
* return the promise
- * @return
*/
- public BetterFuture<IndexOperationMessage> getFuture(){
+ @JsonIgnore
+ public BetterFuture<IndexOperationMessage> getFuture() {
return containerFuture;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index ef0ef5f..1973e5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -60,8 +60,11 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
//loop until we're we're full or we time out
do {
try {
+
+ final long remaining = endTime - System.currentTimeMillis();
+
//we received 1, try to drain
- IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+ IndexOperationMessage polled = messages.poll( remaining, timeUnit );
//drain
if ( polled != null ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index b814603..833e045 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -21,15 +21,11 @@ package org.apache.usergrid.persistence.index.impl;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.elasticsearch.action.ActionRequestBuilder;
-
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -39,7 +35,6 @@ import org.apache.usergrid.persistence.queue.QueueMessage;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -69,13 +64,10 @@ public class BufferQueueSQSImpl implements BufferQueue {
@Override
public void offer( final IndexOperationMessage operation ) {
- final Message toQueue = new Message( operation.getOperations() );
-
-
try {
- this.queue.sendMessage( toQueue );
+ this.queue.sendMessage( operation );
operation.getFuture().run();
}
catch ( IOException e ) {
@@ -87,19 +79,22 @@ public class BufferQueueSQSImpl implements BufferQueue {
@Override
public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
- //loop until we're we're full or we time out
+ //SQS doesn't support more than 10
+
+ final int actualTake = Math.min( 10, takeSize );
+
List<QueueMessage> messages = queue
- .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
- Message.class );
+ .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+ IndexOperationMessage.class );
final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
for ( final QueueMessage message : messages ) {
- SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+ final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
- operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+ SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message, messageBody );
response.add( operation );
}
@@ -111,10 +106,15 @@ public class BufferQueueSQSImpl implements BufferQueue {
@Override
public void ack( final List<IndexOperationMessage> messages ) {
+ //nothing to do
+ if(messages.size() == 0){
+ return;
+ }
+
List<QueueMessage> toAck = new ArrayList<>( messages.size() );
- for(IndexOperationMessage ioe: messages){
- toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+ for ( IndexOperationMessage ioe : messages ) {
+ toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
}
queue.commitMessages( toAck );
@@ -122,22 +122,6 @@ public class BufferQueueSQSImpl implements BufferQueue {
/**
- * The message to queue to SQS
- */
- public static final class Message implements Serializable {
- private final Set<BatchRequest> data;
-
-
- private Message( final Set<BatchRequest> data ) {this.data = data;}
-
-
- public Set<BatchRequest> getData() {
- return data;
- }
- }
-
-
- /**
* The message that subclasses our IndexOperationMessage. holds a pointer to the original message
*/
public class SqsIndexOperationMessage extends IndexOperationMessage {
@@ -145,7 +129,11 @@ public class BufferQueueSQSImpl implements BufferQueue {
private final QueueMessage message;
- public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+ public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) {
+ this.message = message;
+ this.addAllDeIndexRequest( source.getDeIndexRequests() );
+ this.addAllIndexRequest( source.getIndexRequests() );
+ }
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index a279f16..c63c4df 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -26,10 +26,15 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.Client;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
/**
* Represent the properties required to build an index request
*/
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
public class DeIndexRequest implements BatchRequest {
public final String[] indexes;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index b0c731e..b63dfe6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -127,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
final String entityType = entity.getId().getType();
- container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+ container.addIndexRequest(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
return this;
}
@@ -168,7 +168,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
indexes = new String[]{indexIdentifier.getIndex(null)};
}
- container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+ container.addDeIndexRequest( new DeIndexRequest( indexes, entityType, indexId ) );
log.debug("Deindexed Entity with index id " + indexId);
@@ -192,6 +192,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public BetterFuture execute() {
IndexOperationMessage tempContainer = container;
container = new IndexOperationMessage();
+
+ /**
+ * No-op, just disregard it
+ */
+ if(tempContainer.isEmpty()){
+ return tempContainer.getFuture();
+ }
+
return indexBatchBufferProducer.put(tempContainer);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 2342398..8547889 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -82,9 +82,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
final AtomicInteger countFail = new AtomicInteger();
//batch up sets of some size and send them in batch
- this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
+ this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
@Override
- public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+ public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
//name our thread so it's easy to see
Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
@@ -99,21 +99,22 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
drainList = bufferQueue
.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
- for ( IndexOperationMessage drained : drainList ) {
- subscriber.onNext( drained );
- }
- bufferQueue.ack( drainList );
+ subscriber.onNext( drainList );
+
timer.stop();
countFail.set( 0 );
}
- catch( EsRejectedExecutionException err) {
+ catch ( EsRejectedExecutionException err ) {
countFail.incrementAndGet();
- log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+ log.error(
+ "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. "
+ + "Failed {} consecutive times",
+ config.getFailRefreshCount(), countFail.get() );
- //es rejected the exception, sleep and retry in the queue
+ //es rejected the exception, sleep and retry in the queue
try {
Thread.sleep( config.getFailureRetryTime() );
}
@@ -131,8 +132,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
while ( true );
}
- } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
- config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+ } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
@Override
public void call( List<IndexOperationMessage> containerList ) {
if ( containerList.size() > 0 ) {
@@ -142,7 +142,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
time.stop();
}
}
- } );
+} )
+ //ack after we process
+ .doOnNext( new Action1<List<IndexOperationMessage>>() {
+ @Override
+ public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+ bufferQueue.ack( indexOperationMessages );
+ }
+ } );
//start in the background
consumer.subscribe();
@@ -163,15 +170,17 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
@Override
public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
- return Observable.from( operationMessage.getOperations() );
+ final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
+ final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
+
+ return Observable.merge( deIndex, index );
}
} );
//batch shard operations into a bulk request
- flattenMessages
- .buffer(config.getIndexBatchSize())
+ flattenMessages.toList()
.doOnNext(new Action1<List<BatchRequest>>() {
@Override
public void call(List<BatchRequest> builders) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index db1f50e..61d5d25 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -54,7 +54,8 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
public BetterFuture put(IndexOperationMessage message){
Preconditions.checkNotNull(message, "Message cannot be null");
- indexSizeCounter.inc(message.getOperations().size());
+ indexSizeCounter.inc(message.getDeIndexRequests().size());
+ indexSizeCounter.inc(message.getIndexRequests().size());
Timer.Context time = timer.time();
bufferQueue.offer( message );
time.stop();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
index 381d005..4ec4092 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -26,17 +26,20 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
/**
* Represent the properties required to build an index request
*/
+@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
public class IndexRequest implements BatchRequest {
- public final String writeAlias;
- public final String entityType;
- public final String documentId;
+ public String writeAlias;
+ public String entityType;
+ public String documentId;
- public final Map<String, Object> data;
+ public Map<String, Object> data;
public IndexRequest( final String writeAlias, final String entityType, final String documentId,
@@ -48,13 +51,18 @@ public class IndexRequest implements BatchRequest {
}
- public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
- IndexRequestBuilder builder =
- client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+ /**
+ * DO NOT DELETE! Required for Jackson
+ */
+ public IndexRequest() {
+ }
- bulkRequest.add( builder );
+ public void doOperation( final Client client, final BulkRequestBuilder bulkRequest ) {
+ IndexRequestBuilder builder = client.prepareIndex( writeAlias, entityType, documentId ).setSource( data );
+
+ bulkRequest.add( builder );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 37b5e90..215ff57 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
EsTestUtils.waitForTasks(personLikesIndex);
- Thread.sleep( 1000 );
+ Thread.sleep( 30000 );
// now, let's search for muffins
CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
personLikesIndex.refresh();
EsTestUtils.waitForTasks( personLikesIndex );
- Thread.sleep( 1000 );
+ Thread.sleep( 30000 );
// now, let's search for muffins
CandidateResults likes = personLikesIndex.search( searchScope,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..10aa621 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -95,7 +95,8 @@ public class SQSQueueManagerImpl implements QueueManager {
sqs.setRegion(region);
smileFactory.delegateToTextual(true);
mapper = new ObjectMapper( smileFactory );
- mapper.enable(SerializationFeature.INDENT_OUTPUT);
+ //pretty print, disabling for speed
+// mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
} catch ( Exception e ) {
LOG.warn("failed to setup SQS",e);
@@ -180,7 +181,10 @@ public class SQSQueueManagerImpl implements QueueManager {
}
String url = getQueue().getUrl();
LOG.info("Sending Message...{} to {}",body.toString(),url);
- SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
+
+ final String stringBody = toString(body);
+
+ SendMessageRequest request = new SendMessageRequest(url, stringBody);
sqs.sendMessage(request);
}