You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/25 17:33:05 UTC
[1/2] incubator-usergrid git commit: add batches of batches
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-273-indexbuffer 3231534d7 -> ac55e0878
add batches of batches
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/420f5009
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/420f5009
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/420f5009
Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 420f50093e90c2d764d7415a48b1bfb204a1e1f4
Parents: 3231534
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 24 18:57:15 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 24 18:57:15 2015 -0700
----------------------------------------------------------------------
.../persistence/index/IndexBatchBuffer.java | 11 +--
.../index/RequestBuilderContainer.java | 56 ++++++++++++
.../index/impl/EsEntityIndexBatchImpl.java | 40 ++-------
.../index/impl/IndexBatchBufferImpl.java | 90 ++++++++------------
4 files changed, 100 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index b24d37b..1d71bcd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -34,16 +34,11 @@ public interface IndexBatchBuffer {
/**
* put request into buffer, retu
*
- * @param builder
+ * @param container
*/
- public BetterFuture put(IndexRequestBuilder builder);
+ public BetterFuture put(RequestBuilderContainer container);
+
- /**
- * put request into buffer
- *
- * @param builder
- */
- public BetterFuture put(DeleteRequestBuilder builder);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
new file mode 100644
index 0000000..bc5e115
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Classy class class.
+ */
+public class RequestBuilderContainer{
+ private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
+ private final BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> containerFuture;
+
+ public RequestBuilderContainer(){
+ final RequestBuilderContainer parent = this;
+ builders = new ConcurrentLinkedQueue<>();
+ this.containerFuture = new BetterFuture<>(new Callable<Iterator<ShardReplicationOperationRequestBuilder>>() {
+ @Override
+ public Iterator<ShardReplicationOperationRequestBuilder> call() throws Exception {
+ return parent.getBuilder().iterator();
+ }
+ });
+ }
+
+ public void add(ShardReplicationOperationRequestBuilder builder){
+ builders.add(builder);
+ }
+ public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
+ return builders;
+ }
+ public void done(){
+ containerFuture.done();
+ }
+ public BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> getFuture(){
+ return containerFuture;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index e008707..dc8b1af 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -91,7 +91,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final FailureMonitor failureMonitor;
private final AliasedEntityIndex entityIndex;
- private final ConcurrentLinkedQueue<BetterFuture> promises;
+ private final RequestBuilderContainer container;
public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -106,7 +106,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
//constrained
- this.promises = new ConcurrentLinkedQueue<>();
+ this.container = new RequestBuilderContainer();
}
@@ -140,7 +140,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
final String entityType = entity.getId().getType();
IndexRequestBuilder builder =
client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
- promises.add(indexBatchBuffer.put(builder));
+ container.add(builder);
return this;
}
@@ -186,7 +186,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public Object call(String index) {
try {
DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
- promises.add(indexBatchBuffer.put(builder));
+ container.add(builder);
}catch (Exception e){
log.error("failed to deindex",e);
throw e;
@@ -197,14 +197,12 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug("Deindexed Entity with index id " + indexId);
-
return this;
}
@Override
public EntityIndexBatch deindex( final IndexScope indexScope, final Entity entity ) {
-
return deindex( indexScope, entity.getId(), entity.getVersion() );
}
@@ -217,40 +215,16 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public void execute() {
- flushFutures();
+ indexBatchBuffer.put(container);
}
@Override
public void executeAndRefresh() {
- flushFutures();
+ BetterFuture future = indexBatchBuffer.put(container);
+ future.get();
entityIndex.refresh();
}
- private void flushFutures() {
- ObservableIterator<BetterFuture> iterator = new ObservableIterator<BetterFuture>("futures") {
- @Override
- protected Iterator<BetterFuture> getIterator() {
- return promises.iterator();
- }
- };
- Observable.create(iterator)
- .doOnNext(new Action1<BetterFuture>() {
- @Override
- public void call(BetterFuture betterFuture) {
- betterFuture.get();
- }
- })
- .buffer(100)
- .doOnNext(new Action1<List<BetterFuture>>() {
- @Override
- public void call(List<BetterFuture> betterFutures) {
- promises.removeAll(betterFutures);
- }
- })
- .toBlocking()
- .lastOrDefault(null);
- }
-
/**
* Set the entity as a map with the context
*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/420f5009/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index ad14920..364c20a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBatchBuffer;
import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.RequestBuilderContainer;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -38,9 +39,11 @@ import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
+import rx.functions.Func1;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
@@ -96,81 +99,56 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
@Override
- public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
- RequestBuilderContainer container = new RequestBuilderContainer(builder);
+ public BetterFuture put(RequestBuilderContainer container){
bufferCounter.inc();
producer.put(container);
return container.getFuture();
}
- @Override
- public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
- RequestBuilderContainer container = new RequestBuilderContainer(builder);
- bufferCounter.inc();
- producer.put(container);
- return container.getFuture();
- }
-
-
- private static class RequestBuilderContainer{
- private final ShardReplicationOperationRequestBuilder builder;
- private final BetterFuture<ShardReplicationOperationRequestBuilder> containerFuture;
-
- public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
- final RequestBuilderContainer parent = this;
- this.builder = builder;
- this.containerFuture = new BetterFuture<>(new Callable<ShardReplicationOperationRequestBuilder>() {
- @Override
- public ShardReplicationOperationRequestBuilder call() throws Exception {
- return parent.getBuilder();
- }
- });
- }
-
- public ShardReplicationOperationRequestBuilder getBuilder(){
- return builder;
- }
- private void done(){
- containerFuture.done();
- }
- public BetterFuture<ShardReplicationOperationRequestBuilder> getFuture(){
- return containerFuture;
- }
- }
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
- private void execute( boolean refresh) {
+ private void execute(final boolean refresh) {
if (blockingQueue.size() == 0) {
return;
}
- BulkRequestBuilder bulkRequest = initRequest(refresh);
Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
blockingQueue.drainTo(containerCollection);
- int count = 0;
//clear the queue or proceed to buffersize
- for (RequestBuilderContainer container : containerCollection) {
-
- ShardReplicationOperationRequestBuilder builder = container.getBuilder();
- //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
- }
-
- if (count++ == config.getIndexBatchSize()) {
- sendRequest(bulkRequest);
- bulkRequest = initRequest(refresh);
+ Observable.from(containerCollection)
+ .flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
+ @Override
+ public Observable<ShardReplicationOperationRequestBuilder> call(RequestBuilderContainer requestBuilderContainer) {
+ return Observable.from(requestBuilderContainer.getBuilder())
+ .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
+ @Override
+ public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
+ return builder;
+ }
+ });
+ }
+ })
+ .buffer(config.getIndexBatchSize())
+ .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+ @Override
+ public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+ final BulkRequestBuilder bulkRequest = initRequest(refresh);
+ for(ShardReplicationOperationRequestBuilder builder : builders) {
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder) builder);
+ }
+ sendRequest(bulkRequest);
+ }
+ }
+ }).toBlocking().lastOrDefault(null);
- }
- }
- sendRequest(bulkRequest);
for (RequestBuilderContainer container : containerCollection) {
container.done();
}
[2/2] incubator-usergrid git commit: Add batch of batches
Posted by sf...@apache.org.
Add batch of batches
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac55e087
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac55e087
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac55e087
Branch: refs/heads/USERGRID-273-indexbuffer
Commit: ac55e0878bc5a73deb8e07137f5394cb85b409a0
Parents: 420f500
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 09:32:48 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 09:32:48 2015 -0700
----------------------------------------------------------------------
.../persistence/index/EntityIndexBatch.java | 3 +-
.../index/RequestBuilderContainer.java | 9 ++++
.../index/impl/EsEntityIndexBatchImpl.java | 19 +++++----
.../index/impl/EsEntityIndexImpl.java | 2 +-
.../index/impl/IndexBatchBufferImpl.java | 44 ++++++++++++--------
.../persistence/index/impl/EntityIndexTest.java | 9 ++--
6 files changed, 55 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index f5b8abc..f3f9100 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.index;/*
import java.util.UUID;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -64,7 +65,7 @@ public interface EntityIndexBatch {
/**
* Execute the batch
*/
- public void execute();
+ public BetterFuture execute();
/**
* Execute the batch and force the refresh
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
index bc5e115..e7928ff 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
@@ -30,6 +30,8 @@ public class RequestBuilderContainer{
private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
private final BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> containerFuture;
+ private boolean forceRefresh = false;
+
public RequestBuilderContainer(){
final RequestBuilderContainer parent = this;
builders = new ConcurrentLinkedQueue<>();
@@ -53,4 +55,11 @@ public class RequestBuilderContainer{
public BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> getFuture(){
return containerFuture;
}
+
+ public boolean isForceRefresh() {
+ return forceRefresh;
+ }
+ public void setForceRefresh(boolean forceRefresh) {
+ this.forceRefresh = forceRefresh;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index dc8b1af..4fcdeb3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -88,19 +88,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexIdentifier indexIdentifier;
private final IndexBatchBuffer indexBatchBuffer;
- private final FailureMonitor failureMonitor;
private final AliasedEntityIndex entityIndex;
- private final RequestBuilderContainer container;
+ private RequestBuilderContainer container;
public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
- final IndexFig config, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
+ final IndexFig config, final AliasedEntityIndex entityIndex ) {
this.applicationScope = applicationScope;
this.client = client;
this.indexBatchBuffer = indexBatchBuffer;
- this.failureMonitor = failureMonitor;
this.entityIndex = entityIndex;
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
@@ -214,15 +212,20 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
}
@Override
- public void execute() {
- indexBatchBuffer.put(container);
+ public BetterFuture execute() {
+ RequestBuilderContainer tempContainer = container;
+ container = new RequestBuilderContainer();
+ BetterFuture future = indexBatchBuffer.put(tempContainer);
+ return future;
}
@Override
public void executeAndRefresh() {
- BetterFuture future = indexBatchBuffer.put(container);
+ container.setForceRefresh(true);
+ RequestBuilderContainer tempContainer = container;
+ container = new RequestBuilderContainer();
+ BetterFuture future = indexBatchBuffer.put(tempContainer);
future.get();
- entityIndex.refresh();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 4e5687f..2881ce4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -281,7 +281,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
return new EsEntityIndexBatchImpl(
- applicationScope, esProvider.getClient(),indexBatchBuffer, config, failureMonitor, this );
+ applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 364c20a..b5d9528 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -40,12 +40,15 @@ import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -55,24 +58,20 @@ import java.util.concurrent.*;
public class IndexBatchBufferImpl implements IndexBatchBuffer {
private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
- private final MetricsFactory metricsFactory;
private final Counter indexSizeCounter;
private final Client client;
private final FailureMonitorImpl failureMonitor;
private final IndexFig config;
private final Timer flushTimer;
- private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
private final Counter bufferCounter;
private Observable<List<RequestBuilderContainer>> consumer;
private Producer producer;
@Inject
public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
- this.metricsFactory = metricsFactory;
this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
this.config = config;
- this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.producer = new Producer();
this.client = provider.getClient();
@@ -81,18 +80,25 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
private void consumer() {
+ final AtomicLong queueSize = new AtomicLong();
//batch up sets of some size and send them in batch
this.consumer = Observable.create(producer)
+ .subscribeOn(Schedulers.io())
+ .doOnNext(new Action1<RequestBuilderContainer>() {
+ @Override
+ public void call(RequestBuilderContainer requestBuilderContainer) {
+ queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
+ }
+ })
.buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
.doOnNext(new Action1<List<RequestBuilderContainer>>() {
@Override
public void call(List<RequestBuilderContainer> containerList) {
- for (RequestBuilderContainer container : containerList) {
- blockingQueue.add(container);
- }
flushTimer.time();
indexSizeCounter.dec(containerList.size());
- execute(config.isForcedRefresh());
+ if(containerList.size()>0){
+ execute(containerList);
+ }
}
});
consumer.subscribe();
@@ -109,20 +115,22 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
- private void execute(final boolean refresh) {
+ private void execute(final List<RequestBuilderContainer> containers) {
- if (blockingQueue.size() == 0) {
+ if (containers == null || containers.size() == 0) {
return;
}
-
- Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
- blockingQueue.drainTo(containerCollection);
+ final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh());
//clear the queue or proceed to buffersize
- Observable.from(containerCollection)
+ Observable.from(containers)
+ .subscribeOn(Schedulers.io())
.flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
@Override
public Observable<ShardReplicationOperationRequestBuilder> call(RequestBuilderContainer requestBuilderContainer) {
+ if (requestBuilderContainer.isForceRefresh()){
+ isForceRefresh.set(true);
+ }
return Observable.from(requestBuilderContainer.getBuilder())
.map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
@Override
@@ -136,20 +144,20 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
.doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
@Override
public void call(List<ShardReplicationOperationRequestBuilder> builders) {
- final BulkRequestBuilder bulkRequest = initRequest(refresh);
- for(ShardReplicationOperationRequestBuilder builder : builders) {
+ final BulkRequestBuilder bulkRequest = initRequest(isForceRefresh.get());
+ for (ShardReplicationOperationRequestBuilder builder : builders) {
if (builder instanceof IndexRequestBuilder) {
bulkRequest.add((IndexRequestBuilder) builder);
}
if (builder instanceof DeleteRequestBuilder) {
bulkRequest.add((DeleteRequestBuilder) builder);
}
- sendRequest(bulkRequest);
}
+ sendRequest(bulkRequest);
}
}).toBlocking().lastOrDefault(null);
- for (RequestBuilderContainer container : containerCollection) {
+ for (RequestBuilderContainer container : containers) {
container.done();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac55e087/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 6cda9f2..e3eeaf4 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -467,18 +467,21 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = ei.createBatch();
- batch.index( appScope, user ).executeAndRefresh();
+ batch.index( appScope, user );
+ batch.executeAndRefresh();
+
Query query = new Query();
query.addEqualityFilter( "username", "edanuff" );
CandidateResults r = ei.search( appScope, SearchTypes.fromTypes( "edanuff" ), query );
assertEquals( user.getId(), r.get( 0 ).getId() );
- batch.deindex(appScope, user.getId(), user.getVersion() ).executeAndRefresh();
-
+ batch.deindex(appScope, user.getId(), user.getVersion() );
+ batch.executeAndRefresh();
// EntityRef
query = new Query();
query.addEqualityFilter( "username", "edanuff" );
+
r = ei.search(appScope,SearchTypes.fromTypes( "edanuff" ), query );
assertFalse( r.iterator().hasNext() );