You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:35 UTC
[07/50] incubator-usergrid git commit: Added buffer wiring to guice
and updated the tests.
Added buffer wiring to guice and updated the tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd0015d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd0015d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd0015d5
Branch: refs/heads/two-dot-o
Commit: cd0015d5af1371528943e015022be21b2d8099f9
Parents: c5a4767
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 14:56:47 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 14:56:47 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/IndexFig.java | 9 ++++++++
.../persistence/index/guice/IndexModule.java | 10 ++++++++-
.../persistence/index/impl/BufferQueue.java | 7 ++++++
.../index/impl/BufferQueueInMemory.java | 18 ++++++++-------
.../index/impl/EsIndexBufferConsumerImpl.java | 23 +++++++++++++++-----
.../index/impl/EsIndexBufferProducerImpl.java | 2 +-
.../index/guice/TestIndexModule.java | 9 +++++++-
7 files changed, 62 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index c6f08f6..befbaa9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -64,6 +64,11 @@ public interface IndexFig extends GuicyFig {
*/
public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
+ /**
+ * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple backpressure
+ */
+ public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@Default( "127.0.0.1" )
@@ -158,4 +163,8 @@ public interface IndexFig extends GuicyFig {
@Default("one")
@Key( INDEX_WRITE_CONSISTENCY_LEVEL )
String getWriteConsistencyLevel();
+
+ @Default("1000")
+ @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
+ long getFailureRetryTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index ebd9098..d911dab 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -32,7 +32,7 @@ import org.apache.usergrid.persistence.map.guice.MapModule;
import org.safehaus.guicyfig.GuicyFigModule;
-public class IndexModule extends AbstractModule {
+public abstract class IndexModule extends AbstractModule {
@Override
protected void configure() {
@@ -48,6 +48,14 @@ public class IndexModule extends AbstractModule {
bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
+ wireBufferQueue();
}
+
+ /**
+ * Write the <class>BufferQueue</class> for this implementation
+ */
+ public abstract void wireBufferQueue();
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
index dec6ac3..ffc3b90 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -46,4 +46,11 @@ public interface BufferQueue {
* @return
*/
public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+
+
+ /**
+ * Ack all messages so they do not appear again. Meant for transactional queues, and may or may not be implemented
+ * @param messages
+ */
+ public void ack(List<IndexOperationMessage> messages);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
index 502f45d..403762f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
@@ -36,15 +36,11 @@ import com.google.inject.Singleton;
public class BufferQueueInMemory implements BufferQueue {
private final ArrayBlockingQueue<IndexOperationMessage> messages;
- private final IndexFig fig;
@Inject
- public BufferQueueInMemory( final ArrayBlockingQueue<IndexOperationMessage> messages, final IndexFig fig ) {
- this.messages = messages;
-
-
- this.fig = fig;
+ public BufferQueueInMemory(final IndexFig fig ) {
+ messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
}
@@ -78,8 +74,14 @@ public class BufferQueueInMemory implements BufferQueue {
}
}
- while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+ while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+ return response;
+ }
+
- return null;
+ @Override
+ public void ack( final List<IndexOperationMessage> messages ) {
+ //no op for this
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 19b8438..45c12a1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -39,6 +39,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.client.Client;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -70,9 +71,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final BufferQueue bufferQueue;
@Inject
- public EsIndexBufferConsumerImpl( final IndexFig config, final IndexBufferProducer producer, final EsProvider
- provider, final MetricsFactory metricsFactory,
- final BufferQueue bufferQueue ){
+ public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider
+ provider, final MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
this.bufferQueue = bufferQueue;
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
@@ -101,17 +101,30 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
for ( IndexOperationMessage drained : drainList ) {
subscriber.onNext( drained );
}
- drainList.clear();
+
+ bufferQueue.ack( drainList );
+
timer.stop();
countFail.set( 0 );
}
+ catch( EsRejectedExecutionException err) {
+ countFail.incrementAndGet();
+ log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+
+ //es rejected the exception, sleep and retry in the queue
+ try {
+ Thread.sleep( config.getFailureRetryTime() );
+ }
+ catch ( InterruptedException e ) {
+ //swallow
+ }
+ }
catch ( Exception e ) {
int count = countFail.incrementAndGet();
log.error( "failed to dequeue", e );
if ( count > 200 ) {
log.error( "Shutting down index drain due to repetitive failures" );
- //break;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index f9999b2..db1f50e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -46,7 +46,7 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
private final BufferQueue bufferQueue;
@Inject
- public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, IndexFig fig, final BufferQueue bufferQueue ){
+ public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
this.bufferQueue = bufferQueue;
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
this.timer = metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 4d68dda..7e2312d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.index.guice;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.TestModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
public class TestIndexModule extends TestModule {
@@ -32,6 +34,11 @@ public class TestIndexModule extends TestModule {
// configure collections and our core astyanax framework
install( new CollectionModule() );
- install( new IndexModule() );
+ install( new IndexModule() {
+ @Override
+ public void wireBufferQueue() {
+ bind( BufferQueue.class).to( BufferQueueInMemory.class );
+ }
+ } );
}
}