You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 01:21:51 UTC
[03/27] incubator-usergrid git commit: add blocking queue
add blocking queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e4782575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e4782575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e4782575
Branch: refs/heads/USERGRID-280
Commit: e478257513fc193d0c1dce97fab6368e080bdeeb
Parents: 7e0e0be
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 17 17:01:34 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 17 17:01:34 2015 -0800
----------------------------------------------------------------------
.../apache/usergrid/metrics/MetricsFactory.java | 3 +-
.../core/metrics/MetricsFactoryImpl.java | 2 +
.../index/impl/IndexBatchBufferImpl.java | 89 ++++++++++++--------
3 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e4782575/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java b/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
index bad07ad..fccc296 100644
--- a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
@@ -31,9 +31,10 @@ import java.util.concurrent.TimeUnit;
/**
* Singleton class to manage metrics.
+ * TODO:remove this in favor of cp one
*/
@Component("metricsFactory")
-public class MetricsFactory {
+public class MetricsFactory implements org.apache.usergrid.persistence.core.metrics.MetricsFactory {
@Autowired
private Properties properties;
public MetricRegistry registry;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e4782575/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
index 201ee0f..2528070 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.core.metrics;
import com.codahale.metrics.*;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
+import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +42,7 @@ public class MetricsFactoryImpl implements MetricsFactory {
private ConcurrentHashMap<String,Metric> hashMap;
private static final Logger LOG = LoggerFactory.getLogger(MetricsFactoryImpl.class);
+ @Inject
public MetricsFactoryImpl(MetricsFig metricsFig) {
this.metricsFig = metricsFig;
registry = new MetricRegistry();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e4782575/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 29d6e4d..08ed17b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.usergrid.persistence.index.impl;
+import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -36,7 +37,10 @@ import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -55,9 +59,10 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
private final int bufferSize;
private final MetricsFactory metricsFactory;
private final Timer flushTimer;
- private BulkRequestBuilder bulkRequest;
+ private final Counter indexSizeCounter;
private Producer producer;
private Subscription producerObservable;
+ private ArrayBlockingQueue blockingQueue;
@Inject
public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
@@ -69,26 +74,23 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
this.refresh = config.isForcedRefresh();
this.timeout = config.getIndexBufferTimeout();
this.bufferSize = config.getIndexBufferSize();
- this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class,"index.buffer.flush");
- clearBulk();
+ this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
+ this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
+ blockingQueue = new ArrayBlockingQueue(500);
init();
}
- private void clearBulk() {
- bulkRequest = client.prepareBulk();
- }
+
private void init() {
this.producerObservable = Observable.create(producer)
.doOnNext(new Action1<RequestBuilderContainer>() {
@Override
public void call(RequestBuilderContainer container) {
- ShardReplicationOperationRequestBuilder builder = container.getBuilder();
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder)builder);
+ try {
+ blockingQueue.offer(container, 2500, TimeUnit.MILLISECONDS);
+ }catch (InterruptedException ie){
+ throw new RuntimeException(ie);
}
}
})
@@ -97,7 +99,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
@Override
public void call(List<RequestBuilderContainer> builderContainerList) {
flushTimer.time();
- metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").dec(builderContainerList.size());
+ indexSizeCounter.dec(builderContainerList.size());
execute();
}
})
@@ -115,46 +117,61 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
public void flushAndRefresh(){
- execute(bulkRequest.setRefresh(true));
+ execute(true);
}
public void flush(){
execute();
}
private void execute(){
- execute(bulkRequest.setRefresh(refresh));
+ execute(this.refresh);
}
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
- private void execute( final BulkRequestBuilder request ) {
- //nothing to do, we haven't added anthing to the index
- if ( request.numberOfActions() == 0 ) {
- return;
- }
+ private synchronized void execute(boolean refresh ) {
+ try {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ bulkRequest.setRefresh(refresh);
+ int count = bufferSize;
+ while (blockingQueue.size() > 0 && count-- > 0) {
+ RequestBuilderContainer container = (RequestBuilderContainer)blockingQueue.take();
+ ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder) builder);
+ }
+ }
+ //nothing to do, we haven't added anthing to the index
+ if (bulkRequest.numberOfActions() == 0) {
+ return;
+ }
- final BulkResponse responses;
+ final BulkResponse responses;
- try {
- responses = request.execute().actionGet();
- }
- catch ( Throwable t ) {
- log.error( "Unable to communicate with elasticsearch" );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }
+ try {
+ responses = bulkRequest.execute().actionGet();
+ } catch (Throwable t) {
+ log.error("Unable to communicate with elasticsearch");
+ failureMonitor.fail("Unable to execute batch", t);
+ throw t;
+ }
- failureMonitor.success();
+ failureMonitor.success();
- for ( BulkItemResponse response : responses ) {
- if ( response.isFailed() ) {
- throw new RuntimeException( "Unable to index documents. Errors are :"
- + response.getFailure().getMessage() );
+ for (BulkItemResponse response : responses) {
+ if (response.isFailed()) {
+ throw new RuntimeException("Unable to index documents. Errors are :"
+ + response.getFailure().getMessage());
+ }
}
+ }catch (InterruptedException ie){
+ log.error("Problem taking messages off of queue",ie);
+ throw new RuntimeException(ie);
}
-
- clearBulk();
}
private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {