You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/27 00:38:37 UTC

[21/24] incubator-usergrid git commit: indexbuffer: update method names

indexbuffer: update method names


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/05f52513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/05f52513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/05f52513

Branch: refs/heads/two-dot-o
Commit: 05f52513e491e27d9bb0d26374e315aca9b00f8d
Parents: 382610d
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 26 14:28:20 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 26 14:28:20 2015 -0700

----------------------------------------------------------------------
 .../index/IndexOperationMessage.java            | 14 ++++++-
 .../index/impl/EsEntityIndexBatchImpl.java      |  4 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 40 ++++++++------------
 .../index/impl/EsIndexBufferProducerImpl.java   |  4 +-
 4 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 3a0a702..501233e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -41,12 +41,22 @@ public  class IndexOperationMessage {
         });
     }
 
-    public void add(ShardReplicationOperationRequestBuilder builder){
+    public void addOperation(ShardReplicationOperationRequestBuilder builder){
         builders.add(builder);
     }
-    public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
+
+    /**
+     * return operations for the message
+     * @return
+     */
+    public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getOperations(){
         return builders;
     }
+
+    /**
+     * return the promise
+     * @return
+     */
     public BetterFuture<IndexOperationMessage> getFuture(){
         return containerFuture;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 2e0fb56..c2a3fdc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -127,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         final String entityType = entity.getId().getType();
         IndexRequestBuilder builder =
                 client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
-        container.add(builder);
+        container.addOperation(builder);
         return this;
     }
 
@@ -173,7 +173,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
                    public Object call(String index) {
                        try {
                            DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
-                           container.add(builder);
+                           container.addOperation(builder);
                        }catch (Exception e){
                            log.error("failed to deindex",e);
                            throw e;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index eaca9bd..09c7097 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -45,7 +45,6 @@ import rx.schedulers.Schedulers;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Consumer for IndexOperationMessages
@@ -69,22 +68,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.client = provider.getClient();
 
-        final AtomicLong queueSize = new AtomicLong();
         //batch up sets of some size and send them in batch
         this.consumer = Observable.create(producer)
             .subscribeOn(Schedulers.io())
-            .doOnNext(new Action1<IndexOperationMessage>() {
-                @Override
-                public void call(IndexOperationMessage requestBuilderContainer) {
-                    queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
-                }
-            })
             .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
             .doOnNext(new Action1<List<IndexOperationMessage>>() {
                 @Override
                 public void call(List<IndexOperationMessage> containerList) {
                     flushTimer.time();
-                    if(containerList.size()>0){
+                    if (containerList.size() > 0) {
                         execute(containerList);
                     }
                 }
@@ -107,13 +99,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
                 @Override
                 public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
-                    return Observable.from(operationMessage.getBuilder())
-                        .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
-                            @Override
-                            public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
-                                return builder;
-                            }
-                        });
+                    return Observable.from(operationMessage.getOperations());
                 }
             });
 
@@ -123,17 +109,21 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
                 @Override
                 public void call(List<ShardReplicationOperationRequestBuilder> builders) {
-                    final BulkRequestBuilder bulkRequest = initRequest();
-                    for (ShardReplicationOperationRequestBuilder builder : builders) {
-                        indexSizeCounter.dec();
-                        if (builder instanceof IndexRequestBuilder) {
-                            bulkRequest.add((IndexRequestBuilder) builder);
-                        }
-                        if (builder instanceof DeleteRequestBuilder) {
-                            bulkRequest.add((DeleteRequestBuilder) builder);
+                    try {
+                        final BulkRequestBuilder bulkRequest = initRequest();
+                        for (ShardReplicationOperationRequestBuilder builder : builders) {
+                            indexSizeCounter.dec();
+                            if (builder instanceof IndexRequestBuilder) {
+                                bulkRequest.add((IndexRequestBuilder) builder);
+                            }
+                            if (builder instanceof DeleteRequestBuilder) {
+                                bulkRequest.add((DeleteRequestBuilder) builder);
+                            }
                         }
+                        sendRequest(bulkRequest);
+                    }catch (Exception e){
+                        log.error("Failed while sending bulk",e);
                     }
-                    sendRequest(bulkRequest);
                 }
             })
             .toBlocking().lastOrDefault(null);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index 791cea8..29f243b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.index.impl;
 
 import com.codahale.metrics.Counter;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.core.future.BetterFuture;
@@ -48,7 +49,8 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
     }
 
     public BetterFuture put(IndexOperationMessage message){
-        indexSizeCounter.inc(message.getBuilder().size());
+        Preconditions.checkNotNull(message,"Message cannot be null");
+        indexSizeCounter.inc(message.getOperations().size());
         subscriber.onNext(message);
         return message.getFuture();
     }