You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/18 02:02:03 UTC

[2/3] incubator-usergrid git commit: add blocking queue

add blocking queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e4782575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e4782575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e4782575

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: e478257513fc193d0c1dce97fab6368e080bdeeb
Parents: 7e0e0be
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 17 17:01:34 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 17 17:01:34 2015 -0800

----------------------------------------------------------------------
 .../apache/usergrid/metrics/MetricsFactory.java |  3 +-
 .../core/metrics/MetricsFactoryImpl.java        |  2 +
 .../index/impl/IndexBatchBufferImpl.java        | 89 ++++++++++++--------
 3 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e4782575/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java b/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
index bad07ad..fccc296 100644
--- a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
@@ -31,9 +31,10 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Singleton class to manage metrics.
+ * TODO:remove this in favor of cp one
  */
 @Component("metricsFactory")
-public class MetricsFactory {
+public class MetricsFactory implements org.apache.usergrid.persistence.core.metrics.MetricsFactory {
     @Autowired
     private Properties properties;
     public MetricRegistry registry;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e4782575/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
index 201ee0f..2528070 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.core.metrics;
 import com.codahale.metrics.*;
 import com.codahale.metrics.graphite.Graphite;
 import com.codahale.metrics.graphite.GraphiteReporter;
+import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +42,7 @@ public class MetricsFactoryImpl implements MetricsFactory {
     private ConcurrentHashMap<String,Metric> hashMap;
     private static final Logger LOG = LoggerFactory.getLogger(MetricsFactoryImpl.class);
 
+    @Inject
     public MetricsFactoryImpl(MetricsFig metricsFig) {
         this.metricsFig = metricsFig;
         registry = new MetricRegistry();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e4782575/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 29d6e4d..08ed17b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -36,7 +37,10 @@ import rx.Subscriber;
 import rx.Subscription;
 import rx.functions.Action1;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 
@@ -55,9 +59,10 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     private final int bufferSize;
     private final MetricsFactory metricsFactory;
     private final Timer flushTimer;
-    private BulkRequestBuilder bulkRequest;
+    private final Counter indexSizeCounter;
     private Producer producer;
     private Subscription producerObservable;
+    private ArrayBlockingQueue blockingQueue;
 
     @Inject
     public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
@@ -69,26 +74,23 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
         this.refresh = config.isForcedRefresh();
         this.timeout = config.getIndexBufferTimeout();
         this.bufferSize = config.getIndexBufferSize();
-        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class,"index.buffer.flush");
-        clearBulk();
+        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
+        this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
+        blockingQueue = new ArrayBlockingQueue(500);
         init();
     }
 
-    private void clearBulk() {
-        bulkRequest = client.prepareBulk();
-    }
+
 
     private void init() {
         this.producerObservable = Observable.create(producer)
                 .doOnNext(new Action1<RequestBuilderContainer>() {
                     @Override
                     public void call(RequestBuilderContainer container) {
-                        ShardReplicationOperationRequestBuilder builder = container.getBuilder();
-                        if (builder instanceof IndexRequestBuilder) {
-                            bulkRequest.add((IndexRequestBuilder) builder);
-                        }
-                        if (builder instanceof DeleteRequestBuilder) {
-                            bulkRequest.add((DeleteRequestBuilder)builder);
+                        try {
+                            blockingQueue.offer(container, 2500, TimeUnit.MILLISECONDS);
+                        }catch (InterruptedException ie){
+                            throw new RuntimeException(ie);
                         }
                     }
                 })
@@ -97,7 +99,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
                     @Override
                     public void call(List<RequestBuilderContainer> builderContainerList) {
                         flushTimer.time();
-                        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").dec(builderContainerList.size());
+                        indexSizeCounter.dec(builderContainerList.size());
                         execute();
                     }
                 })
@@ -115,46 +117,61 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
     public void flushAndRefresh(){
-        execute(bulkRequest.setRefresh(true));
+        execute(true);
     }
     public void flush(){
         execute();
     }
 
     private void execute(){
-        execute(bulkRequest.setRefresh(refresh));
+        execute(this.refresh);
     }
 
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */
-    private void execute( final BulkRequestBuilder request ) {
-        //nothing to do, we haven't added anthing to the index
-        if ( request.numberOfActions() == 0 ) {
-            return;
-        }
+    private synchronized void execute(boolean refresh ) {
+        try {
+            BulkRequestBuilder bulkRequest = client.prepareBulk();
+            bulkRequest.setRefresh(refresh);
+            int count = bufferSize;
+            while (blockingQueue.size() > 0 && count-- > 0) {
+                RequestBuilderContainer container = (RequestBuilderContainer)blockingQueue.take();
+                ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+                if (builder instanceof IndexRequestBuilder) {
+                    bulkRequest.add((IndexRequestBuilder) builder);
+                }
+                if (builder instanceof DeleteRequestBuilder) {
+                    bulkRequest.add((DeleteRequestBuilder) builder);
+                }
+            }
+            //nothing to do, we haven't added anthing to the index
+            if (bulkRequest.numberOfActions() == 0) {
+                return;
+            }
 
-        final BulkResponse responses;
+            final BulkResponse responses;
 
-        try {
-            responses = request.execute().actionGet();
-        }
-        catch ( Throwable t ) {
-            log.error( "Unable to communicate with elasticsearch" );
-            failureMonitor.fail( "Unable to execute batch", t );
-            throw t;
-        }
+            try {
+                responses = bulkRequest.execute().actionGet();
+            } catch (Throwable t) {
+                log.error("Unable to communicate with elasticsearch");
+                failureMonitor.fail("Unable to execute batch", t);
+                throw t;
+            }
 
-        failureMonitor.success();
+            failureMonitor.success();
 
-        for ( BulkItemResponse response : responses ) {
-            if ( response.isFailed() ) {
-                throw new RuntimeException( "Unable to index documents.  Errors are :"
-                        + response.getFailure().getMessage() );
+            for (BulkItemResponse response : responses) {
+                if (response.isFailed()) {
+                    throw new RuntimeException("Unable to index documents.  Errors are :"
+                            + response.getFailure().getMessage());
+                }
             }
+        }catch (InterruptedException ie){
+            log.error("Problem taking messages off of queue",ie);
+            throw new RuntimeException(ie);
         }
-
-        clearBulk();
     }
 
     private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {