You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/04 23:57:28 UTC
[04/50] [abbrv] incubator-usergrid git commit: indexbuffer: update
method names
indexbuffer: update method names
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/05f52513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/05f52513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/05f52513
Branch: refs/heads/USERGRID-405
Commit: 05f52513e491e27d9bb0d26374e315aca9b00f8d
Parents: 382610d
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 26 14:28:20 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 26 14:28:20 2015 -0700
----------------------------------------------------------------------
.../index/IndexOperationMessage.java | 14 ++++++-
.../index/impl/EsEntityIndexBatchImpl.java | 4 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 40 ++++++++------------
.../index/impl/EsIndexBufferProducerImpl.java | 4 +-
4 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 3a0a702..501233e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -41,12 +41,22 @@ public class IndexOperationMessage {
});
}
- public void add(ShardReplicationOperationRequestBuilder builder){
+ public void addOperation(ShardReplicationOperationRequestBuilder builder){
builders.add(builder);
}
- public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
+
+ /**
+ * return operations for the message
+ * @return
+ */
+ public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getOperations(){
return builders;
}
+
+ /**
+ * return the promise
+ * @return
+ */
public BetterFuture<IndexOperationMessage> getFuture(){
return containerFuture;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 2e0fb56..c2a3fdc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -127,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
final String entityType = entity.getId().getType();
IndexRequestBuilder builder =
client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
- container.add(builder);
+ container.addOperation(builder);
return this;
}
@@ -173,7 +173,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public Object call(String index) {
try {
DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
- container.add(builder);
+ container.addOperation(builder);
}catch (Exception e){
log.error("failed to deindex",e);
throw e;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index eaca9bd..09c7097 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -45,7 +45,6 @@ import rx.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Consumer for IndexOperationMessages
@@ -69,22 +68,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.client = provider.getClient();
- final AtomicLong queueSize = new AtomicLong();
//batch up sets of some size and send them in batch
this.consumer = Observable.create(producer)
.subscribeOn(Schedulers.io())
- .doOnNext(new Action1<IndexOperationMessage>() {
- @Override
- public void call(IndexOperationMessage requestBuilderContainer) {
- queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
- }
- })
.buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
.doOnNext(new Action1<List<IndexOperationMessage>>() {
@Override
public void call(List<IndexOperationMessage> containerList) {
flushTimer.time();
- if(containerList.size()>0){
+ if (containerList.size() > 0) {
execute(containerList);
}
}
@@ -107,13 +99,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
.flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
@Override
public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
- return Observable.from(operationMessage.getBuilder())
- .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
- @Override
- public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
- return builder;
- }
- });
+ return Observable.from(operationMessage.getOperations());
}
});
@@ -123,17 +109,21 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
.doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
@Override
public void call(List<ShardReplicationOperationRequestBuilder> builders) {
- final BulkRequestBuilder bulkRequest = initRequest();
- for (ShardReplicationOperationRequestBuilder builder : builders) {
- indexSizeCounter.dec();
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
+ try {
+ final BulkRequestBuilder bulkRequest = initRequest();
+ for (ShardReplicationOperationRequestBuilder builder : builders) {
+ indexSizeCounter.dec();
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder) builder);
+ }
}
+ sendRequest(bulkRequest);
+ }catch (Exception e){
+ log.error("Failed while sending bulk",e);
}
- sendRequest(bulkRequest);
}
})
.toBlocking().lastOrDefault(null);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index 791cea8..29f243b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Counter;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.core.future.BetterFuture;
@@ -48,7 +49,8 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
}
public BetterFuture put(IndexOperationMessage message){
- indexSizeCounter.inc(message.getBuilder().size());
+ Preconditions.checkNotNull(message,"Message cannot be null");
+ indexSizeCounter.inc(message.getOperations().size());
subscriber.onNext(message);
return message.getFuture();
}