You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:40 UTC
[40/50] incubator-usergrid git commit: Updated futures impl for
different queues
Updated futures impl for different queues
Added onError to catch errors
Added a gauge so we can track index operation in flight
Updated queue scope to only be a name, since we only use them at the system level.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2d6ae369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2d6ae369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2d6ae369
Branch: refs/heads/USERGRID-460
Commit: 2d6ae3698e2faff733e73d17aff569a3850a6485
Parents: 361060e
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 13 16:19:40 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 13 16:19:40 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 7 +-
.../corepersistence/StaleIndexCleanupTest.java | 4 +
.../persistence/core/future/BetterFuture.java | 43 ++++--
.../core/metrics/MetricsFactory.java | 9 ++
.../core/metrics/MetricsFactoryImpl.java | 121 ++++++++++------
stack/corepersistence/queryindex/pom.xml | 8 +-
.../persistence/index/IndexBufferConsumer.java | 11 ++
.../usergrid/persistence/index/IndexFig.java | 11 ++
.../index/IndexOperationMessage.java | 7 +-
.../persistence/index/guice/IndexModule.java | 11 +-
.../persistence/index/impl/BufferQueue.java | 22 ++-
.../index/impl/BufferQueueInMemoryImpl.java | 53 ++++---
.../index/impl/BufferQueueSQSImpl.java | 8 +-
.../index/impl/EsEntityIndexImpl.java | 142 ++-----------------
.../index/impl/EsIndexBufferConsumerImpl.java | 131 ++++++++++-------
.../index/guice/TestIndexModule.java | 8 +-
.../index/impl/BufferQueueSQSImplTest.java | 5 +
.../impl/EntityConnectionIndexImplTest.java | 3 -
.../persistence/index/impl/EntityIndexTest.java | 1 -
.../persistence/index/impl/EsTestUtils.java | 48 -------
.../cassandra/ManagementServiceImpl.java | 9 +-
.../notifications/NotificationsService.java | 50 ++++---
22 files changed, 359 insertions(+), 353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 3230faa..161037b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -71,12 +71,7 @@ public class CoreModule extends AbstractModule {
install( new CommonModule());
install(new CollectionModule());
install(new GraphModule());
- install( new IndexModule() {
- @Override
- public void wireBufferQueue() {
- bind(BufferQueue.class).to( BufferQueueSQSImpl.class );
- }
- } );
+ install( new IndexModule() );
// install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
// install(new QueueModule());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 82d8f93..8d31b69 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -310,6 +310,10 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
em.delete( thing );
}
+
+ //put this into the top of the queue, once it's acked we've been flushed
+ em.refreshIndex();
+
// wait for indexes to be cleared for the deleted entities
count = 0;
do {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
index 201fa9a..777ac52 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -16,28 +16,53 @@
*/
package org.apache.usergrid.persistence.core.future;
+
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
+
/**
* Future without the exception nastiness
*/
-public class BetterFuture<T> extends FutureTask<T> {
- public BetterFuture(Callable<T> callable){
- super(callable);
+public class BetterFuture<T> extends FutureTask<T> {
+
+ private Throwable error;
+
+
+ public BetterFuture( Callable<T> callable ) {
+ super( callable );
}
- public void done(){
+
+ public void setError( final Throwable t ) {
+ this.error = t;
+ }
+
+
+ public void done() {
run();
}
- public T get(){
+
+ public T get() {
+
+ T returnValue = null;
+
try {
- return super.get();
- }catch (Exception e){
- throw new RuntimeException(e);
+ returnValue = super.get();
+ }
+ catch ( InterruptedException e ) {
+ //swallow
+ }
+ catch ( ExecutionException e ) {
+ //swallow
}
- }
+ if ( error != null ) {
+ throw new RuntimeException( "Error in getting future", error );
+ }
+ return returnValue;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
index 453e556..62a5cb9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
@@ -31,4 +31,13 @@ public interface MetricsFactory {
Counter getCounter(Class<?> klass, String name);
Meter getMeter(Class<?> klass, String name);
+
+ /**
+ * Get a gauge and create it
+ * @param clazz
+ * @param name
+ * @param gauge
+ * @return
+ */
+ void addGauge( Class<?> clazz, String name, Gauge<?> gauge );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
index 6d0881b..904e56a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
@@ -17,17 +17,27 @@
package org.apache.usergrid.persistence.core.metrics;
-import com.codahale.metrics.*;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
/**
* Singleton class to manage metrics.
@@ -39,74 +49,101 @@ public class MetricsFactoryImpl implements MetricsFactory {
private MetricRegistry registry;
private GraphiteReporter graphiteReporter;
private JmxReporter jmxReporter;
- private ConcurrentHashMap<String,Metric> hashMap;
- private static final Logger LOG = LoggerFactory.getLogger(MetricsFactoryImpl.class);
+ private ConcurrentHashMap<String, Metric> hashMap;
+ private static final Logger LOG = LoggerFactory.getLogger( MetricsFactoryImpl.class );
+
@Inject
- public MetricsFactoryImpl(MetricsFig metricsFig) {
+ public MetricsFactoryImpl( MetricsFig metricsFig ) {
registry = new MetricRegistry();
String metricsHost = metricsFig.getHost();
- if(!metricsHost.equals("false")) {
- Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, 2003));
- graphiteReporter = GraphiteReporter.forRegistry(registry)
- .prefixedWith("usergrid-metrics")
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .filter(MetricFilter.ALL)
- .build(graphite);
- graphiteReporter.start(30, TimeUnit.SECONDS);
- }else {
- LOG.warn("MetricsService:Logger not started.");
+ if ( !metricsHost.equals( "false" ) ) {
+ Graphite graphite = new Graphite( new InetSocketAddress( metricsHost, 2003 ) );
+ graphiteReporter = GraphiteReporter.forRegistry( registry ).prefixedWith( "usergrid-metrics" )
+ .convertRatesTo( TimeUnit.SECONDS )
+ .convertDurationsTo( TimeUnit.MILLISECONDS ).filter( MetricFilter.ALL )
+ .build( graphite );
+ graphiteReporter.start( 30, TimeUnit.SECONDS );
+ }
+ else {
+ LOG.warn( "MetricsService:Logger not started." );
}
hashMap = new ConcurrentHashMap<String, Metric>();
- jmxReporter = JmxReporter.forRegistry(registry).build();
+ jmxReporter = JmxReporter.forRegistry( registry ).build();
jmxReporter.start();
}
+
@Override
public MetricRegistry getRegistry() {
return registry;
}
+
@Override
- public Timer getTimer(Class<?> klass, String name) {
- return getMetric(Timer.class, klass, name);
+ public Timer getTimer( Class<?> klass, String name ) {
+ return getMetric( Timer.class, klass, name );
}
+
@Override
- public Histogram getHistogram(Class<?> klass, String name) {
- return getMetric(Histogram.class, klass, name);
+ public Histogram getHistogram( Class<?> klass, String name ) {
+ return getMetric( Histogram.class, klass, name );
}
+
@Override
- public Counter getCounter(Class<?> klass, String name) {
- return getMetric(Counter.class, klass, name);
+ public Counter getCounter( Class<?> klass, String name ) {
+ return getMetric( Counter.class, klass, name );
}
+
@Override
- public Meter getMeter(Class<?> klass, String name) {
- return getMetric(Meter.class, klass, name);
+ public Meter getMeter( Class<?> klass, String name ) {
+ return getMetric( Meter.class, klass, name );
}
- private <T> T getMetric(Class<T> metricClass, Class<?> klass, String name) {
+
+ @Override
+ public void addGauge( final Class<?> clazz, final String name, final Gauge<?> gauge ) {
+
+ this.getRegistry().register( MetricRegistry.name( clazz, name ), gauge );
+ }
+
+
+ private <T> T getMetric( Class<T> metricClass, Class<?> klass, String name ) {
String key = metricClass.getName() + klass.getName() + name;
- Metric metric = hashMap.get(key);
- if (metric == null) {
- if (metricClass == Histogram.class) {
- metric = this.getRegistry().histogram(MetricRegistry.name(klass, name));
+ Metric metric = hashMap.get( key );
+ if ( metric == null ) {
+ if ( metricClass == Histogram.class ) {
+ metric = this.getRegistry().histogram( MetricRegistry.name( klass, name ) );
}
- if (metricClass == Timer.class) {
- metric = this.getRegistry().timer(MetricRegistry.name(klass, name));
+ if ( metricClass == Timer.class ) {
+ metric = this.getRegistry().timer( MetricRegistry.name( klass, name ) );
}
- if (metricClass == Meter.class) {
- metric = this.getRegistry().meter(MetricRegistry.name(klass, name));
+ if ( metricClass == Meter.class ) {
+ metric = this.getRegistry().meter( MetricRegistry.name( klass, name ) );
}
- if (metricClass == Counter.class) {
- metric = this.getRegistry().counter(MetricRegistry.name(klass, name));
+ if ( metricClass == Counter.class ) {
+ metric = this.getRegistry().counter( MetricRegistry.name( klass, name ) );
}
- hashMap.put(key, metric);
+
+
+ hashMap.put( key, metric );
}
- return (T) metric;
+ return ( T ) metric;
+ }
+
+
+ /**
+ *
+ * @param metricClass
+ * @param klass
+ * @param name
+ * @return
+ */
+ private String getKey( Class<?> metricClass, Class<?> klass, String name ) {
+ return metricClass.getName() + klass.getName() + name;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index af843ad..5f01ee7 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -139,7 +139,13 @@
<classifier>tests</classifier>
</dependency>
-
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>queue</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
index ac7489c..40c7852 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
@@ -23,4 +23,15 @@ package org.apache.usergrid.persistence.index;
* Classy class class.
*/
public interface IndexBufferConsumer {
+
+
+ /**
+ * Start the consumer
+ */
+ public void start();
+
+ /**
+ * Stop the consumers
+ */
+ public void stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 445789f..6be8234 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.FigSingleton;
import org.safehaus.guicyfig.GuicyFig;
import org.safehaus.guicyfig.Key;
+import org.apache.usergrid.persistence.index.guice.QueueProvider;
+
@FigSingleton
public interface IndexFig extends GuicyFig {
@@ -86,6 +88,11 @@ public interface IndexFig extends GuicyFig {
*/
public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+ /**
+ * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class>
+ */
+ public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
+
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@Default( "127.0.0.1" )
@@ -190,4 +197,8 @@ public interface IndexFig extends GuicyFig {
@Key( ELASTICSEARCH_WORKER_COUNT )
int getWorkerCount();
+ @Default( "LOCAL" )
+ @Key( ELASTICSEARCH_QUEUE_IMPL )
+ String getQueueImplementation();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 33b68cd..5686e26 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -128,6 +128,11 @@ public class IndexOperationMessage implements Serializable {
}
public void done() {
- getFuture().done();
+ //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
+ final BetterFuture<IndexOperationMessage> future = getFuture();
+
+ if(future != null ){
+ future.done();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index b03e1c0..95f3bd4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -23,6 +23,7 @@ import org.apache.usergrid.persistence.index.*;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexFactoryImpl;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
@@ -33,7 +34,7 @@ import org.apache.usergrid.persistence.queue.guice.QueueModule;
import org.safehaus.guicyfig.GuicyFigModule;
-public abstract class IndexModule extends AbstractModule {
+public class IndexModule extends AbstractModule {
@Override
protected void configure() {
@@ -50,14 +51,10 @@ public abstract class IndexModule extends AbstractModule {
bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
- wireBufferQueue();
- }
+ bind( BufferQueue.class).toProvider( QueueProvider.class );
+ }
- /**
- * Write the <class>BufferQueue</class> for this implementation
- */
- public abstract void wireBufferQueue();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
index ffc3b90..76b49c2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -32,25 +32,37 @@ import org.apache.usergrid.persistence.index.IndexOperationMessage;
public interface BufferQueue {
/**
- * Offer the indexoperation message
+ * Offer the indexoperation message. Some queues may support not returning the future until ack or fail.
+ * Other queues may return the future after ack on the offer. See the implementation documentation for details.
* @param operation
*/
public void offer(final IndexOperationMessage operation);
/**
- * Perform a take, potentially blocking. Until takesize is available, or timeout has elapsed
+ * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed.
+ * May return less than the take size, but will never return null
+ *
* @param takeSize
* @param timeout
* @param timeUnit
- * @return
+ * @return A null safe lid
*/
public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
/**
- * Ack all messages so they do not appear again. Meant for transactional queues, and may or may not be implemented
+ * Ack all messages so they do not appear again. Meant for transactional queues, and may or may not be implemented.
+ * This will set the future as done in in memory operations
+ *
* @param messages
*/
- public void ack(List<IndexOperationMessage> messages);
+ public void ack(final List<IndexOperationMessage> messages);
+
+ /**
+ * Mark these message as failed. Set the exception in the future on local operation
+ *
+ * @param messages
+ */
+ public void fail(final List<IndexOperationMessage> messages, final Throwable t);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index 6716fd1..998c086 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
@@ -47,7 +48,6 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
@Override
public void offer( final IndexOperationMessage operation ) {
messages.offer( operation );
- operation.done();
}
@@ -55,30 +55,30 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+ try {
- final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
- //loop until we're we're full or we time out
- do {
- try {
+ messages.drainTo( response, takeSize );
- final long remaining = endTime - System.currentTimeMillis();
+ //we got something, go process it
+ if ( response.size() > 0 ) {
+ return response;
+ }
- //we received 1, try to drain
- IndexOperationMessage polled = messages.poll( remaining, timeUnit );
- //drain
- if ( polled != null ) {
- response.add( polled );
- messages.drainTo( response, takeSize - response.size() );
- }
- }
- catch ( InterruptedException ie ) {
- //swallow
+ final IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+ if ( polled != null ) {
+ response.add( polled );
+
+ //try to add more
+ messages.drainTo( response, takeSize - 1 );
}
}
- while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+ catch ( InterruptedException e ) {
+ //swallow
+ }
+
return response;
}
@@ -86,6 +86,23 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
@Override
public void ack( final List<IndexOperationMessage> messages ) {
- //no op for this
+ //if we have a future ack it
+ for ( final IndexOperationMessage op : messages ) {
+ op.done();
+ }
+ }
+
+
+ @Override
+ public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+
+
+ for ( final IndexOperationMessage op : messages ) {
+ final BetterFuture<IndexOperationMessage> future = op.getFuture();
+
+ if ( future != null ) {
+ future.setError( t );
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 4a07704..b8d162b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -101,7 +101,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig,
final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
final QueueScope queueScope =
- new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
+ new QueueScopeImpl( QUEUE_NAME );
this.queue = queueManagerFactory.getQueueManager( queueScope );
this.indexFig = indexFig;
@@ -260,6 +260,12 @@ public class BufferQueueSQSImpl implements BufferQueue {
}
+ @Override
+ public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+ //no op, just let it retry after the queue timeout
+ }
+
+
/** Read the object from Base64 string. */
private IndexOperationMessage fromString( String s ) throws IOException {
IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index fa50734..8be044f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -47,6 +47,8 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
@@ -111,8 +113,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final Timer addWriteAliasTimer;
private final Timer addReadAliasTimer;
private final Timer searchTimer;
- private final Timer allVersionsTimerFuture;
- private final Timer deletePreviousTimerFuture;
/**
* We purposefully make this per instance. Some indexes may work, while others may fail
@@ -127,7 +127,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final IndexFig config;
- private final MetricsFactory metricsFactory;
//number of times to wait for the index to refresh properly.
@@ -148,8 +147,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private Timer refreshTimer;
private Timer cursorTimer;
private Timer getVersionsTimer;
- private Timer allVersionsTimer;
- private Timer deletePreviousTimer;
private final MapManager mapManager;
@@ -168,11 +165,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
this.esProvider = provider;
this.config = config;
this.cursorTimeout = config.getQueryCursorTimeout();
- this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, appScope);
+ this.indexIdentifier = IndexingUtils.createIndexIdentifier( config, appScope );
this.alias = indexIdentifier.getAlias();
this.failureMonitor = new FailureMonitorImpl( config, provider );
this.aliasCache = indexCache;
- this.metricsFactory = metricsFactory;
this.addTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.add.index.timer" );
this.removeAliasTimer = metricsFactory
@@ -191,14 +187,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
.getTimer( EsEntityIndexImpl.class, "es.entity.index.search.cursor.timer" );
this.getVersionsTimer =metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.get.versions.timer" );
- this.allVersionsTimer = metricsFactory
- .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer" );
- this.deletePreviousTimer = metricsFactory
- .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer" );
- this.allVersionsTimerFuture = metricsFactory
- .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer.future" );
- this.deletePreviousTimerFuture = metricsFactory
- .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer.future" );
+
final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );
@@ -394,8 +383,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
*/
private void createMappings(final String indexName) throws IOException {
- XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
- XContentFactory.jsonBuilder(), DEFAULT_TYPE );
+ XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping( XContentFactory.jsonBuilder(),
+ DEFAULT_TYPE );
//Added For Graphite Metrics
@@ -421,7 +410,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes,
final Query query ) {
- final String context = IndexingUtils.createContextName(indexScope);
+ final String context = IndexingUtils.createContextName( indexScope );
final String[] entityTypes = searchTypes.getTypeNames();
QueryBuilder qb = query.createQueryBuilder(context);
@@ -632,7 +621,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
public int getPendingTasks() {
final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin()
- .cluster().pendingClusterTasks(new PendingClusterTasksRequest()).actionGet();
+ .cluster().pendingClusterTasks( new PendingClusterTasksRequest() ).actionGet();
return tasksResponse.pendingTasks().size();
}
@@ -674,114 +663,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
}
-// @Override
-// public ListenableActionFuture deleteAllVersionsOfEntity(final Id entityId ) {
-// String idString = IndexingUtils.idString(entityId).toLowerCase();
-//
-// final TermQueryBuilder tqb = QueryBuilders.termQuery(ENTITYID_ID_FIELDNAME, idString);
-//
-// //Added For Graphite Metrics
-// final Timer.Context timeDeleteAllVersions =allVersionsTimer.time();
-// final Timer.Context timeDeleteAllVersionsFuture = allVersionsTimerFuture.time();
-//
-// final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
-// .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
-//
-// response.addListener( new ActionListener<DeleteByQueryResponse>() {
-//
-// @Override
-// public void onResponse( DeleteByQueryResponse response) {
-// timeDeleteAllVersions.stop();
-// logger
-// .debug( "Deleted entity {}:{} from all index scopes with response status = {}", entityId.getType(),
-// entityId.getUuid(), response.status().toString() );
-//
-// checkDeleteByQueryResponse(tqb, response);
-// }
-//
-//
-// @Override
-// public void onFailure( Throwable e ) {
-// timeDeleteAllVersions.stop();
-// logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
-// entityId.getUuid(), e);
-//
-//
-// }
-// });
-// timeDeleteAllVersionsFuture.stop();
-// return response;
-// }
-//
-//
-// @Override
-// public ListenableActionFuture deletePreviousVersions(final Id entityId, final UUID version) {
-//
-// String idString = IndexingUtils.idString( entityId ).toLowerCase();
-//
-// final FilteredQueryBuilder fqb = QueryBuilders.filteredQuery(
-// QueryBuilders.termQuery(ENTITYID_ID_FIELDNAME, idString),
-// FilterBuilders.rangeFilter(ENTITY_VERSION_FIELDNAME).lt(version.timestamp())
-// );
-//
-// //Added For Graphite Metrics
-// //Checks the time from the execute to the response below
-// final Timer.Context timeDeletePreviousVersions = deletePreviousTimer.time();
-// final Timer.Context timeDeletePreviousVersionFuture = deletePreviousTimerFuture.time();
-// final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
-// .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(fqb).execute();
-//
-// //Added For Graphite Metrics
-// response.addListener(new ActionListener<DeleteByQueryResponse>() {
-// @Override
-// public void onResponse(DeleteByQueryResponse response) {
-// timeDeletePreviousVersions.stop();
-// //error message needs to be retooled so that it describes the entity more throughly
-// logger
-// .debug("Deleted entity {}:{} with version {} from all " + "index scopes with response status = {}",
-// entityId.getType(), entityId.getUuid(), version, response.status().toString());
-//
-// checkDeleteByQueryResponse( fqb, response );
-// }
-//
-//
-// @Override
-// public void onFailure( Throwable e ) {
-// timeDeletePreviousVersions.stop();
-// logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
-// entityId.getUuid(), e );
-// }
-// } );
-//
-// timeDeletePreviousVersionFuture.stop();
-//
-// return response;
-// }
-
-
- /**
- * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
- */
- private void checkDeleteByQueryResponse(
- final QueryBuilder query, final DeleteByQueryResponse response ) {
-
- for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
- final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
-
- for ( ShardOperationFailedException failedException : failures ) {
- logger.error( String.format("Unable to delete by query %s. "
- + "Failed with code %d and reason %s on shard %s in index %s",
- query.toString(),
- failedException.status().getStatus(),
- failedException.reason(),
- failedException.shardId(),
- failedException.index() )
- );
- }
-
- }
- }
-
/**
* Completely delete an index.
@@ -856,8 +737,11 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
public Health getIndexHealth() {
try {
- ClusterHealthResponse chr = esProvider.getClient().admin().cluster().health(
- new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(null)})).get();
+ final ActionFuture<ClusterHealthResponse> future = esProvider.getClient().admin().cluster().health(
+ new ClusterHealthRequest( new String[] { indexIdentifier.getIndex( null ) } ) );
+
+ //only wait 2 seconds max
+ ClusterHealthResponse chr = future.actionGet( 2000 );
return Health.valueOf( chr.getStatus().name() );
}
catch ( Exception ex ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 836ec3d..5259a26 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -20,31 +20,29 @@
package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBufferConsumer;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import org.elasticsearch.action.ActionRequestBuilder;
+
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
-import rx.functions.Action2;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
@@ -52,7 +50,6 @@ import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -69,6 +66,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Timer flushTimer;
private final Counter indexSizeCounter;
+ private final Counter indexErrorCounter;
private final Meter flushMeter;
private final Timer produceTimer;
private final BufferQueue bufferQueue;
@@ -80,13 +78,28 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private Object mutex = new Object();
+
+ private AtomicLong inFlight = new AtomicLong( );
+
@Inject
public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
- this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
- this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
- this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
+ this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "buffer.flush");
+ this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "buffer.meter");
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "buffer.size");
+ this.indexErrorCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "error.count");
+
+ //wire up the gauge of inflight messages
+ metricsFactory.addGauge( EsIndexBufferConsumerImpl.class, "inflight.meter", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return inFlight.longValue();
+ }
+ } );
+
+
+
this.config = config;
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.client = provider.getClient();
@@ -130,66 +143,67 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private void startWorker(){
synchronized ( mutex) {
- final AtomicInteger countFail = new AtomicInteger();
+ Observable<List<IndexOperationMessage>> consumer = Observable.create(
+ new Observable.OnSubscribe<List<IndexOperationMessage>>() {
+ @Override
+ public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
- Observable<List<IndexOperationMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
- @Override
- public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
+ //name our thread so it's easy to see
+ Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
- //name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+ List<IndexOperationMessage> drainList;
+ do {
+ try {
- List<IndexOperationMessage> drainList;
- do {
- try {
+ Timer.Context timer = produceTimer.time();
- Timer.Context timer = produceTimer.time();
+ drainList = bufferQueue
+ .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
+ TimeUnit.MILLISECONDS );
- drainList = bufferQueue.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
- TimeUnit.MILLISECONDS );
+ subscriber.onNext( drainList );
- subscriber.onNext( drainList );
+ //take since we're in flight
+ inFlight.addAndGet( drainList.size() );
- timer.stop();
+ timer.stop();
+ }
- countFail.set( 0 );
- }
- catch ( EsRejectedExecutionException err ) {
- countFail.incrementAndGet();
- log.error(
- "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. " + "Failed {} consecutive times", config.getFailRefreshCount(),
- countFail.get() );
+ catch ( Exception e ) {
+ final long sleepTime = config.getFailureRetryTime();
- //es rejected the exception, sleep and retry in the queue
- try {
- Thread.sleep( config.getFailureRetryTime() );
- }
- catch ( InterruptedException e ) {
- //swallow
- }
- }
- catch ( Exception e ) {
- int count = countFail.incrementAndGet();
- log.error( "failed to dequeue", e );
- if ( count > 200 ) {
- log.error( "Shutting down index drain due to repetitive failures" );
+ log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, e );
+
+ try {
+ Thread.sleep( sleepTime );
+ }
+ catch ( InterruptedException ie ) {
+ //swallow
+ }
+
+ indexErrorCounter.inc();
}
}
+ while ( true );
}
- while ( true );
- }
- } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+ } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
@Override
public void call( List<IndexOperationMessage> containerList ) {
- if ( containerList.size() > 0 ) {
- flushMeter.mark( containerList.size() );
- Timer.Context time = flushTimer.time();
- execute( containerList );
- time.stop();
+ if ( containerList.size() == 0 ) {
+ return;
}
+
+ flushMeter.mark( containerList.size() );
+ Timer.Context time = flushTimer.time();
+
+
+ execute( containerList );
+
+ time.stop();
+
}
} )
//ack after we process
@@ -197,6 +211,16 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
@Override
public void call( final List<IndexOperationMessage> indexOperationMessages ) {
bufferQueue.ack( indexOperationMessages );
+ //release so we know we've done processing
+ inFlight.addAndGet( -1 * indexOperationMessages.size() );
+ }
+ } ).doOnError( new Action1<Throwable>() {
+ @Override
+ public void call( final Throwable throwable ) {
+
+ log.error( "An exception occurred when trying to deque and write to elasticsearch. Ignoring",
+ throwable );
+ indexErrorCounter.inc();
}
} );
@@ -236,7 +260,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
//collection all the operations into a single stream
.reduce( initRequest(), new Func2<BulkRequestBuilder, BatchRequest, BulkRequestBuilder>() {
@Override
- public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+ public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder,
+ final BatchRequest batchRequest ) {
batchRequest.doOperation( client, bulkRequestBuilder );
return bulkRequestBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 57c2fab..50b994d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -35,12 +35,6 @@ public class TestIndexModule extends TestModule {
// configure collections and our core astyanax framework
install( new CollectionModule() );
- install( new IndexModule() {
- @Override
- public void wireBufferQueue() {
- bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-// bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
- }
- } );
+ install( new IndexModule() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
index 6922c15..9a362cb 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.apache.usergrid.persistence.index.guice.TestIndexModule;
import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.NoAWSCredsRule;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
@@ -59,6 +60,10 @@ public class BufferQueueSQSImplTest {
@Rule
public MigrationManagerRule migrationManagerRule;
+
+ @Rule
+ public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
+
@Inject
public QueueManagerFactory queueManagerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index c5f3488..a399809 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -140,7 +140,6 @@ public class EntityConnectionIndexImplTest extends BaseIT {
personLikesIndex.refresh();
- EsTestUtils.waitForTasks(personLikesIndex);
Thread.sleep( 2000 );
// now, let's search for muffins
@@ -270,8 +269,6 @@ public class EntityConnectionIndexImplTest extends BaseIT {
batch.execute().get();
personLikesIndex.refresh();
- EsTestUtils.waitForTasks( personLikesIndex );
- Thread.sleep( 2000 );
// now, let's search for muffins
CandidateResults likes = personLikesIndex.search( searchScope,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index a2135a3..ca9bf79 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -94,7 +94,6 @@ public class EntityIndexTest extends BaseIT {
entityIndex.refresh();
- Thread.sleep(100000000);
testQueries( indexScope, searchTypes, entityIndex );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
deleted file mode 100644
index 30f0ed0..0000000
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import org.apache.usergrid.persistence.index.EntityIndex;
-
-
-/**
- * Utilities to make testing ES easier
- */
-public class EsTestUtils {
-
-
- /**
- * Checks to see if we have pending tasks in the cluster. If so waits until they are finished. Adding
- * new types can cause lag even after refresh since the type mapping needs applied
- * @param index
- */
- public static void waitForTasks(final EntityIndex index){
-
- while(index.getPendingTasks() > 0){
- try {
- Thread.sleep( 100 );
- }
- catch ( InterruptedException e ) {
- //swallow
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index 35ed091..854c3e0 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -540,7 +540,6 @@ public class ManagementServiceImpl implements ManagementService {
em.addToCollection( organizationEntity, "users", new SimpleEntityRef( User.ENTITY_TYPE, user.getUuid() ) );
- em.refreshIndex();
writeUserToken( smf.getManagementAppId(), organizationEntity, encryptionService
.plainTextCredentials( generateOAuthSecretKey( AuthPrincipalType.ORGANIZATION ), user.getUuid(),
@@ -557,7 +556,7 @@ public class ManagementServiceImpl implements ManagementService {
startOrganizationActivationFlow( organization );
- em.refreshIndex();
+
return organization;
}
@@ -1649,7 +1648,7 @@ public class ManagementServiceImpl implements ManagementService {
properties.put( "appUuid", applicationId );
Entity appInfo = em.create( applicationId, APPLICATION_INFO, properties );
- em.refreshIndex();
+
writeUserToken( smf.getManagementAppId(), appInfo, encryptionService
.plainTextCredentials( generateOAuthSecretKey( AuthPrincipalType.APPLICATION ), null,
@@ -1670,7 +1669,7 @@ public class ManagementServiceImpl implements ManagementService {
+ ")</a> created a new application named " + applicationName, null );
}
- em.refreshIndex();
+
return new ApplicationInfo( applicationId, appInfo.getName() );
}
@@ -2376,7 +2375,7 @@ public class ManagementServiceImpl implements ManagementService {
if ( sendEmail ) {
startOrganizationActivationFlow( organization );
}
- em.refreshIndex();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 5eed002..abd77ee 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -16,39 +16,56 @@
*/
package org.apache.usergrid.services.notifications;
-import java.util.*;
-import com.codahale.metrics.*;
-import com.codahale.metrics.Timer;
-import com.google.inject.Injector;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.mq.Message;
-import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.PathQuery;
+import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.QueueScopeFactory;
-import org.apache.usergrid.services.*;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.services.AbstractCollectionService;
+import org.apache.usergrid.services.ServiceAction;
+import org.apache.usergrid.services.ServiceContext;
+import org.apache.usergrid.services.ServiceInfo;
+import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.ServiceParameter;
+import org.apache.usergrid.services.ServicePayload;
+import org.apache.usergrid.services.ServiceRequest;
+import org.apache.usergrid.services.ServiceResults;
+import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.entities.Device;
-import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
-import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
-import static org.apache.usergrid.utils.InflectionUtils.pluralize;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
+import static org.apache.usergrid.utils.InflectionUtils.pluralize;
+
public class NotificationsService extends AbstractCollectionService {
@@ -90,9 +107,8 @@ public class NotificationsService extends AbstractCollectionService {
postMeter = metricsService.getMeter(NotificationsService.class, "requests");
postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
JobScheduler jobScheduler = new JobScheduler(sm,em);
- String name = ApplicationQueueManagerImpl.getQueueNames(props);
- QueueScopeFactory queueScopeFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueScopeFactory.class);
- QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), name);
+ String name = ApplicationQueueManagerImpl.getQueueNames( props );
+ QueueScope queueScope = new QueueScopeImpl( name );
queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
QueueManager queueManager = TEST_QUEUE_MANAGER !=null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);