You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/01 18:27:44 UTC
[09/36] usergrid git commit: change consumer to producer
change consumer to producer
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5218cda9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5218cda9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5218cda9
Branch: refs/heads/master
Commit: 5218cda9d5732ae4ee2ee2ce1e7ae297975a7fad
Parents: a5ece51
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 16:17:13 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:01:20 2015 -0600
----------------------------------------------------------------------
.../persistence/index/guice/IndexModule.java | 2 +-
.../index/impl/EsEntityIndexBatchImpl.java | 4 +-
.../index/impl/EsEntityIndexFactoryImpl.java | 8 +-
.../index/impl/EsEntityIndexImpl.java | 6 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 216 -------------------
.../index/impl/EsIndexProducerImpl.java | 209 ++++++++++++++++++
.../index/impl/IndexBufferConsumer.java | 40 ----
.../persistence/index/impl/IndexProducer.java | 37 ++++
.../index/impl/IndexRefreshCommandImpl.java | 5 +-
9 files changed, 257 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 7279174..46559ad 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -52,7 +52,7 @@ public abstract class IndexModule extends AbstractModule {
bind(IndexCache.class).to(EsIndexCacheImpl.class);
bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class);
- bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
+ bind(IndexProducer.class).to(EsIndexProducerImpl.class).asEagerSingleton();
//wire up the edg migration. A no-op ATM, but retained for future development
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index c11feed..64a1c6a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -41,7 +41,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexAlias alias;
private final IndexLocationStrategy indexLocationStrategy;
- private final IndexBufferConsumer indexBatchBufferProducer;
+ private final IndexProducer indexBatchBufferProducer;
private final EntityIndex entityIndex;
private final ApplicationScope applicationScope;
@@ -49,7 +49,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
- final IndexBufferConsumer indexBatchBufferProducer,
+ final IndexProducer indexBatchBufferProducer,
final EntityIndex entityIndex
) {
this.indexLocationStrategy = locationStrategy;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index 869d079..b66fd40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -37,7 +37,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
private final IndexFig config;
private final IndexCache indexCache;
private final EsProvider provider;
- private final IndexBufferConsumer indexBufferConsumer;
+ private final IndexProducer indexProducer;
private final MetricsFactory metricsFactory;
private final IndexRefreshCommand refreshCommand;
@@ -50,7 +50,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
config,
refreshCommand,
metricsFactory,
- indexBufferConsumer,
+ indexProducer,
locationStrategy
);
index.initialize();
@@ -62,7 +62,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
public EsEntityIndexFactoryImpl( final IndexFig indexFig,
final IndexCache indexCache,
final EsProvider provider,
- final IndexBufferConsumer indexBufferConsumer,
+ final IndexProducer indexProducer,
final MetricsFactory metricsFactory,
final IndexRefreshCommand refreshCommand
@@ -70,7 +70,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
this.config = indexFig;
this.indexCache = indexCache;
this.provider = provider;
- this.indexBufferConsumer = indexBufferConsumer;
+ this.indexProducer = indexProducer;
this.metricsFactory = metricsFactory;
this.refreshCommand = refreshCommand;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 1c63a7b..6317a69 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -63,8 +63,6 @@ import org.elasticsearch.index.query.*;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.slf4j.Logger;
@@ -118,7 +116,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
private final int cursorTimeout;
private final long queryTimeout;
- private final IndexBufferConsumer indexBatchBufferProducer;
+ private final IndexProducer indexBatchBufferProducer;
private final FailureMonitorImpl failureMonitor;
private final Timer aggregationTimer;
@@ -133,7 +131,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
final IndexFig indexFig,
final IndexRefreshCommand indexRefreshCommand,
final MetricsFactory metricsFactory,
- final IndexBufferConsumer indexBatchBufferProducer,
+ final IndexProducer indexBatchBufferProducer,
final IndexLocationStrategy indexLocationStrategy
) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
deleted file mode 100644
index d126b5d..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.codahale.metrics.Histogram;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexFig;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Consumer for IndexOperationMessages
- */
-@Singleton
-public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
- private static final Logger log = LoggerFactory.getLogger( EsIndexBufferConsumerImpl.class );
-
- private final IndexFig config;
- private final FailureMonitorImpl failureMonitor;
- private final Client client;
- private final Timer flushTimer;
- private final IndexFig indexFig;
- private final Counter indexSizeCounter;
- private final Histogram roundtripTimer;
- private final Timer indexTimer;
-
-
- private AtomicLong inFlight = new AtomicLong();
-
-
- @Inject
- public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider,
- final MetricsFactory metricsFactory, final IndexFig indexFig ) {
- this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
- this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
- this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
-
- //wire up the gauge of inflight messages
- metricsFactory.addGauge(EsIndexBufferConsumerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
-
-
- this.indexTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index" );
-
- this.config = config;
- this.failureMonitor = new FailureMonitorImpl(config, provider);
- this.client = provider.getClient();
- this.indexFig = indexFig;
-
-
- //batch up sets of some size and send them in batch
-
- }
-
- public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
- Preconditions.checkNotNull(message, "Message cannot be null");
- indexSizeCounter.inc(message.getDeIndexRequests().size());
- indexSizeCounter.inc(message.getIndexRequests().size());
- return processBatch(message);
- }
-
-
- /**
- * Process the buffer of batches
- * @param batch
- * @return
- */
- private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
-
- //take our stream of batches, then stream then into individual ops for consumption on ES
- final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
- final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
-
- final int indexOperationSetSize = indexOperationSet.size();
- final int deIndexOperationSetSize = deIndexOperationSet.size();
-
- log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
-
- indexSizeCounter.dec(indexOperationSetSize);
- indexSizeCounter.dec(deIndexOperationSetSize);
-
- final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
- final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
-
- final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
-
- //buffer into the max size we can send ES and fire them all off until we're completed
- final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
- //flatten the buffer into a single batch execution
- .flatMap(individualOps -> Observable.from(individualOps)
- //collect them
- .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
- log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
- batchOperation.doOperation(client, bulkRequestBuilder);
- }))
- //write them
- .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
-
-
- //now that we've processed them all, ack the futures after our last batch comes through
- final Observable<IndexOperationMessage> processedIndexOperations =
- requests.lastOrDefault(null).flatMap(lastRequest -> {
- if (lastRequest != null) {
- return Observable.just(batch);
- } else {
- return Observable.empty();
- }
- });
-
- //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
- //mark this as done
- return processedIndexOperations.doOnNext(processedIndexOp -> {
- roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
- });
- }
-
-
- /*
-
- /**
- * initialize request
- */
- private BulkRequestBuilder initRequest() {
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
- bulkRequest.setRefresh( config.isForcedRefresh() );
- return bulkRequest;
- }
-
-
- /**
- * send bulk request
- */
- private void sendRequest( BulkRequestBuilder bulkRequest ) {
- //nothing to do, we haven't added anything to the index
- if ( bulkRequest.numberOfActions() == 0 ) {
- return;
- }
-
- final BulkResponse responses;
-
-
- final Timer.Context timer = indexTimer.time();
-
- try {
- responses = bulkRequest.execute().actionGet( );
- } catch ( Throwable t ) {
- log.error( "Unable to communicate with elasticsearch" );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }finally{
- timer.stop();
- }
-
- failureMonitor.success();
-
- boolean error = false;
-
- for ( BulkItemResponse response : responses ) {
-
- if ( response.isFailed() ) {
- // log error and continue processing
- log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
- response.getType(), response.getIndex(), response.getFailureMessage() );
-
- error = true;
- }
- }
-
- if ( error ) {
- throw new RuntimeException(
- "Error during processing of bulk index operations one of the responses failed. Check previous log "
- + "entries" );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
new file mode 100644
index 0000000..a2c8663
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Histogram;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+@Singleton
+public class EsIndexProducerImpl implements IndexProducer {
+ private static final Logger log = LoggerFactory.getLogger( EsIndexProducerImpl.class );
+
+ private final IndexFig config;
+ private final FailureMonitorImpl failureMonitor;
+ private final Client client;
+ private final Timer flushTimer;
+ private final IndexFig indexFig;
+ private final Counter indexSizeCounter;
+ private final Histogram roundtripTimer;
+ private final Timer indexTimer;
+
+
+ private AtomicLong inFlight = new AtomicLong();
+
+
+ @Inject
+ public EsIndexProducerImpl(final IndexFig config, final EsProvider provider,
+ final MetricsFactory metricsFactory, final IndexFig indexFig) {
+ this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, "index_buffer.flush");
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size");
+ this.roundtripTimer = metricsFactory.getHistogram(EsIndexProducerImpl.class, "index_buffer.message_cycle");
+
+ //wire up the gauge of inflight messages
+ metricsFactory.addGauge(EsIndexProducerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
+
+
+ this.indexTimer = metricsFactory.getTimer( EsIndexProducerImpl.class, "index" );
+
+ this.config = config;
+ this.failureMonitor = new FailureMonitorImpl(config, provider);
+ this.client = provider.getClient();
+ this.indexFig = indexFig;
+
+
+ //batch up sets of some size and send them in batch
+
+ }
+
+ public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
+ Preconditions.checkNotNull(message, "Message cannot be null");
+ indexSizeCounter.inc(message.getDeIndexRequests().size());
+ indexSizeCounter.inc(message.getIndexRequests().size());
+ return processBatch(message);
+ }
+
+
+ /**
+ * Process the buffer of batches
+ * @param batch
+ * @return
+ */
+ private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
+
+ //take our stream of batches, then stream then into individual ops for consumption on ES
+ final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+ final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+ final int indexOperationSetSize = indexOperationSet.size();
+ final int deIndexOperationSetSize = deIndexOperationSet.size();
+
+ log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
+
+ indexSizeCounter.dec(indexOperationSetSize);
+ indexSizeCounter.dec(deIndexOperationSetSize);
+
+ final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+ final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
+
+ final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
+
+ //buffer into the max size we can send ES and fire them all off until we're completed
+ final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
+ //flatten the buffer into a single batch execution
+ .flatMap(individualOps -> Observable.from(individualOps)
+ //collect them
+ .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+ log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+ batchOperation.doOperation(client, bulkRequestBuilder);
+ }))
+ //write them
+ .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
+
+
+ //now that we've processed them all, ack the futures after our last batch comes through
+ final Observable<IndexOperationMessage> processedIndexOperations =
+ requests.lastOrDefault(null).flatMap(lastRequest -> {
+ if (lastRequest != null) {
+ return Observable.just(batch);
+ } else {
+ return Observable.empty();
+ }
+ });
+
+ //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
+ //mark this as done
+ return processedIndexOperations.doOnNext(processedIndexOp -> {
+ roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+ });
+ }
+
+
+ /*
+
+ /**
+ * initialize request
+ */
+ private BulkRequestBuilder initRequest() {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
+ bulkRequest.setRefresh( config.isForcedRefresh() );
+ return bulkRequest;
+ }
+
+
+ /**
+ * send bulk request
+ */
+ private void sendRequest( BulkRequestBuilder bulkRequest ) {
+ //nothing to do, we haven't added anything to the index
+ if ( bulkRequest.numberOfActions() == 0 ) {
+ return;
+ }
+
+ final BulkResponse responses;
+
+
+ final Timer.Context timer = indexTimer.time();
+
+ try {
+ responses = bulkRequest.execute().actionGet( );
+ } catch ( Throwable t ) {
+ log.error( "Unable to communicate with elasticsearch" );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }finally{
+ timer.stop();
+ }
+
+ failureMonitor.success();
+
+ boolean error = false;
+
+ for ( BulkItemResponse response : responses ) {
+
+ if ( response.isFailed() ) {
+ // log error and continue processing
+ log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
+ response.getType(), response.getIndex(), response.getFailureMessage() );
+
+ error = true;
+ }
+ }
+
+ if ( error ) {
+ throw new RuntimeException(
+ "Error during processing of bulk index operations one of the responses failed. Check previous log "
+ + "entries" );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
deleted file mode 100644
index cfeb505..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import rx.Observable;
-
-import java.util.List;
-
-
-/**
- * Buffer index requests
- */
-public interface IndexBufferConsumer {
-
-
- /**
- * Put this operation into our collapsing bufer
- * @param message
- * @return
- */
- Observable<IndexOperationMessage> put(IndexOperationMessage message);
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
new file mode 100644
index 0000000..ba7027e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import rx.Observable;
+
+
+/**
+ * Buffer index requests
+ */
+public interface IndexProducer {
+
+
+ /**
+ * Put this operation into our collapsing bufer
+ * @param message
+ * @return
+ */
+ Observable<IndexOperationMessage> put(IndexOperationMessage message);
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 7b9bc5d..01942a8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.persistence.index.impl;
-import java.util.Collections;
import java.util.Map;
import java.util.UUID;
@@ -58,7 +57,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
private final IndexCache indexCache;
private final EsProvider esProvider;
- private final IndexBufferConsumer producer;
+ private final IndexProducer producer;
private final IndexFig indexFig;
private final Timer timer;
@@ -66,7 +65,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
@Inject
public IndexRefreshCommandImpl(
final EsProvider esProvider,
- final IndexBufferConsumer producer,
+ final IndexProducer producer,
final IndexFig indexFig,
final MetricsFactory metricsFactory,
final IndexCache indexCache ) {