You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/26 17:12:01 UTC
incubator-usergrid git commit: switch to producer consumer model
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-273-indexbuffer f32952e8a -> 77cfeb426
switch to producer consumer model
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/77cfeb42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/77cfeb42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/77cfeb42
Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 77cfeb4261aa7a3d9474c076a9ca2c49d8a121eb
Parents: f32952e
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 26 09:11:57 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 26 09:11:57 2015 -0700
----------------------------------------------------------------------
.../persistence/index/IndexBufferConsumer.java | 26 +++
.../persistence/index/IndexBufferProducer.java | 35 ++++
.../persistence/index/guice/IndexModule.java | 11 +-
.../persistence/index/impl/Consumer.java | 188 ------------------
.../index/impl/EsEntityIndexBatchImpl.java | 8 +-
.../index/impl/EsEntityIndexImpl.java | 10 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 191 +++++++++++++++++++
.../index/impl/EsIndexBufferProducerImpl.java | 52 +++++
.../impl/EsIndexBufferProducerObservable.java | 57 ++++++
.../index/impl/IndexBatchBufferImpl.java | 50 -----
.../persistence/index/impl/Producer.java | 54 ------
11 files changed, 374 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
new file mode 100644
index 0000000..ac7489c
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferConsumer {
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
new file mode 100644
index 0000000..c7779a1
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferProducer {
+
+ Observable.OnSubscribe<IndexOperationMessage> getObservable();
+
+ BetterFuture put(IndexOperationMessage message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 5af148a..8e39ff7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -19,14 +19,12 @@
package org.apache.usergrid.persistence.index.guice;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.*;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
-import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -42,8 +40,7 @@ public class IndexModule extends AbstractModule {
.implement(EntityIndex.class, EsEntityIndexImpl.class)
.build(EntityIndexFactory.class));
- bind(IndexBatchBuffer.class).to(IndexBatchBufferImpl.class);
-
+ bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java
deleted file mode 100644
index b34a66d..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Consumer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Consumer for IndexOperationMessages
- */
-public class Consumer {
- private static final Logger log = LoggerFactory.getLogger(Consumer.class);
-
- private final IndexFig config;
- private final FailureMonitorImpl failureMonitor;
- private final Client client;
- private final Observable<List<IndexOperationMessage>> consumer;
- private final Timer flushTimer;
- private final Counter indexSizeCounter;
-
- public Consumer(final IndexFig config,final Producer producer, final EsProvider provider, final MetricsFactory metricsFactory){
- this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
- this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
- this.config = config;
- this.failureMonitor = new FailureMonitorImpl(config,provider);
- this.client = provider.getClient();
-
- final AtomicLong queueSize = new AtomicLong();
- //batch up sets of some size and send them in batch
- this.consumer = Observable.create(producer)
- .subscribeOn(Schedulers.io())
- .doOnNext(new Action1<IndexOperationMessage>() {
- @Override
- public void call(IndexOperationMessage requestBuilderContainer) {
- queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
- }
- })
- .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
- .doOnNext(new Action1<List<IndexOperationMessage>>() {
- @Override
- public void call(List<IndexOperationMessage> containerList) {
- flushTimer.time();
- if(containerList.size()>0){
- execute(containerList);
- }
- }
- });
- consumer.subscribe();
- }
-
- /**
- * Execute the request, check for errors, then re-init the batch for future use
- */
- private void execute(final List<IndexOperationMessage> operationMessages) {
-
- if (operationMessages == null || operationMessages.size() == 0) {
- return;
- }
-
- //process and flatten all the messages to builder requests
- Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
- .subscribeOn(Schedulers.io())
- .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
- @Override
- public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
- return Observable.from(operationMessage.getBuilder())
- .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
- @Override
- public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
- return builder;
- }
- });
- }
- });
-
- //batch shard operations into a bulk request
- flattenMessages
- .buffer(config.getIndexBatchSize())
- .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
- @Override
- public void call(List<ShardReplicationOperationRequestBuilder> builders) {
- final BulkRequestBuilder bulkRequest = initRequest();
- for (ShardReplicationOperationRequestBuilder builder : builders) {
- indexSizeCounter.dec();
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
- }
- }
- sendRequest(bulkRequest);
- }
- })
- .toBlocking().lastOrDefault(null);
-
- //call back all futures
- Observable.from(operationMessages)
- .subscribeOn(Schedulers.io())
- .doOnNext(new Action1<IndexOperationMessage>() {
- @Override
- public void call(IndexOperationMessage operationMessage) {
- operationMessage.getFuture().done();
- }
- })
- .toBlocking().lastOrDefault(null);
- }
-
- /**
- * initialize request
- * @return
- */
- private BulkRequestBuilder initRequest() {
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
- bulkRequest.setRefresh(config.isForcedRefresh());
- return bulkRequest;
- }
-
- /**
- * send bulk request
- * @param bulkRequest
- */
- private void sendRequest(BulkRequestBuilder bulkRequest) {
- //nothing to do, we haven't added anthing to the index
- if (bulkRequest.numberOfActions() == 0) {
- return;
- }
-
- final BulkResponse responses;
-
- try {
- responses = bulkRequest.execute().actionGet();
- } catch (Throwable t) {
- log.error("Unable to communicate with elasticsearch");
- failureMonitor.fail("Unable to execute batch", t);
- throw t;
- }
-
- failureMonitor.success();
-
- for (BulkItemResponse response : responses) {
- if (response.isFailed()) {
- throw new RuntimeException("Unable to index documents. Errors are :"
- + response.getFailure().getMessage());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 455dba6..2e0fb56 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -76,18 +76,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
- private final IndexBatchBuffer indexBatchBuffer;
+ private final IndexBufferProducer indexBatchBufferProducer;
private final AliasedEntityIndex entityIndex;
private IndexOperationMessage container;
- public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
+ public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBufferProducer indexBatchBufferProducer,
final IndexFig config, final AliasedEntityIndex entityIndex ) {
this.applicationScope = applicationScope;
this.client = client;
- this.indexBatchBuffer = indexBatchBuffer;
+ this.indexBatchBufferProducer = indexBatchBufferProducer;
this.entityIndex = entityIndex;
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
@@ -204,7 +204,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public BetterFuture execute() {
IndexOperationMessage tempContainer = container;
container = new IndexOperationMessage();
- return indexBatchBuffer.put(tempContainer);
+ return indexBatchBufferProducer.put(tempContainer);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 693b168..ad638c6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -87,7 +87,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
- private final IndexBatchBuffer indexBatchBuffer;
+ private final IndexBufferProducer indexBatchBufferProducer;
/**
* We purposefully make this per instance. Some indexes may work, while others may fail
@@ -119,8 +119,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Inject
- public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBatchBuffer indexBatchBuffer, final EsProvider provider, final EsIndexCache indexCache) {
- this.indexBatchBuffer = indexBatchBuffer;
+ public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final EsIndexCache indexCache) {
+ this.indexBatchBufferProducer = indexBatchBufferProducer;
ValidationUtils.validateApplicationScope( appScope );
this.applicationScope = appScope;
this.esProvider = provider;
@@ -282,7 +282,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
EntityIndexBatch batch = new EsEntityIndexBatchImpl(
- applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
+ applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this );
return batch;
}
@@ -434,7 +434,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
public void refresh() {
- BetterFuture future = indexBatchBuffer.put(new IndexOperationMessage());
+ BetterFuture future = indexBatchBufferProducer.put(new IndexOperationMessage());
future.get();
//loop through all batches and retrieve promises and call get
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
new file mode 100644
index 0000000..af61824
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -0,0 +1,191 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
+ private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class);
+
+ private final IndexFig config;
+ private final FailureMonitorImpl failureMonitor;
+ private final Client client;
+ private final Observable<List<IndexOperationMessage>> consumer;
+ private final Timer flushTimer;
+ private final Counter indexSizeCounter;
+
+ public EsIndexBufferConsumerImpl(final IndexFig config, final EsIndexBufferProducerObservable producer, final EsProvider provider, final MetricsFactory metricsFactory){
+ this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
+ this.config = config;
+ this.failureMonitor = new FailureMonitorImpl(config,provider);
+ this.client = provider.getClient();
+
+ final AtomicLong queueSize = new AtomicLong();
+ //batch up sets of some size and send them in batch
+ this.consumer = Observable.create(producer)
+ .subscribeOn(Schedulers.io())
+ .doOnNext(new Action1<IndexOperationMessage>() {
+ @Override
+ public void call(IndexOperationMessage requestBuilderContainer) {
+ queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
+ }
+ })
+ .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
+ .doOnNext(new Action1<List<IndexOperationMessage>>() {
+ @Override
+ public void call(List<IndexOperationMessage> containerList) {
+ flushTimer.time();
+ if(containerList.size()>0){
+ execute(containerList);
+ }
+ }
+ });
+ consumer.subscribe();
+ }
+
+ /**
+ * Execute the request, check for errors, then re-init the batch for future use
+ */
+ private void execute(final List<IndexOperationMessage> operationMessages) {
+
+ if (operationMessages == null || operationMessages.size() == 0) {
+ return;
+ }
+
+ //process and flatten all the messages to builder requests
+ Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
+ .subscribeOn(Schedulers.io())
+ .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
+ @Override
+ public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
+ return Observable.from(operationMessage.getBuilder())
+ .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
+ @Override
+ public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
+ return builder;
+ }
+ });
+ }
+ });
+
+ //batch shard operations into a bulk request
+ flattenMessages
+ .buffer(config.getIndexBatchSize())
+ .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+ @Override
+ public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+ final BulkRequestBuilder bulkRequest = initRequest();
+ for (ShardReplicationOperationRequestBuilder builder : builders) {
+ indexSizeCounter.dec();
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder) builder);
+ }
+ }
+ sendRequest(bulkRequest);
+ }
+ })
+ .toBlocking().lastOrDefault(null);
+
+ //call back all futures
+ Observable.from(operationMessages)
+ .subscribeOn(Schedulers.io())
+ .doOnNext(new Action1<IndexOperationMessage>() {
+ @Override
+ public void call(IndexOperationMessage operationMessage) {
+ operationMessage.getFuture().done();
+ }
+ })
+ .toBlocking().lastOrDefault(null);
+ }
+
+ /**
+ * initialize request
+ * @return
+ */
+ private BulkRequestBuilder initRequest() {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+ bulkRequest.setRefresh(config.isForcedRefresh());
+ return bulkRequest;
+ }
+
+ /**
+ * send bulk request
+ * @param bulkRequest
+ */
+ private void sendRequest(BulkRequestBuilder bulkRequest) {
+ //nothing to do, we haven't added anthing to the index
+ if (bulkRequest.numberOfActions() == 0) {
+ return;
+ }
+
+ final BulkResponse responses;
+
+ try {
+ responses = bulkRequest.execute().actionGet();
+ } catch (Throwable t) {
+ log.error("Unable to communicate with elasticsearch");
+ failureMonitor.fail("Unable to execute batch", t);
+ throw t;
+ }
+
+ failureMonitor.success();
+
+ for (BulkItemResponse response : responses) {
+ if (response.isFailed()) {
+ throw new RuntimeException("Unable to index documents. Errors are :"
+ + response.getFailure().getMessage());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
new file mode 100644
index 0000000..715bb84
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+
+/**
+ * Buffer index requests into sets to send.
+ */
+@Singleton
+public class EsIndexBufferProducerImpl implements IndexBufferProducer {
+ private final IndexBufferConsumer consumer;
+ private EsIndexBufferProducerObservable producer;
+
+ @Inject
+ public EsIndexBufferProducerImpl(final MetricsFactory metricsFactory, final IndexFig config, final EsProvider provider){
+ this.producer = new EsIndexBufferProducerObservable(metricsFactory);
+ this.consumer = new EsIndexBufferConsumerImpl(config,producer,provider,metricsFactory);
+ }
+
+ @Override
+ public Observable.OnSubscribe<IndexOperationMessage> getObservable() {
+ return producer;
+ }
+
+ @Override
+ public BetterFuture put(IndexOperationMessage container){
+ return producer.put(container);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java
new file mode 100644
index 0000000..4a70bca
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerObservable.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * Producer for index operation messages
+ */
+@Singleton
+public class EsIndexBufferProducerObservable implements Observable.OnSubscribe<IndexOperationMessage> {
+
+ private final MetricsFactory metricsFactory;
+ private final Counter indexSizeCounter;
+ private Subscriber<? super IndexOperationMessage> subscriber;
+
+ @Inject
+ public EsIndexBufferProducerObservable(MetricsFactory metricsFactory){
+ this.metricsFactory = metricsFactory;
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerObservable.class,"index.buffer.size");
+
+ }
+ @Override
+ public void call(Subscriber<? super IndexOperationMessage> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ public BetterFuture put(IndexOperationMessage message){
+ indexSizeCounter.inc(message.getBuilder().size());
+ subscriber.onNext(message);
+ return message.getFuture();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
deleted file mode 100644
index dd95de7..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Buffer index requests into sets to send.
- */
-@Singleton
-public class IndexBatchBufferImpl implements IndexBatchBuffer {
- private Consumer consumer;
- private Producer producer;
-
- @Inject
- public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
- this.producer = new Producer(metricsFactory);
- this.consumer = new Consumer(config,producer,provider,metricsFactory);
- }
-
- @Override
- public BetterFuture put(IndexOperationMessage container){
- return producer.put(container);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/77cfeb42/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java
deleted file mode 100644
index 06a1138..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/Producer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import rx.Observable;
-import rx.Subscriber;
-
-/**
- * Producer for index operation messages
- */
-public class Producer implements Observable.OnSubscribe<IndexOperationMessage> {
-
- private final MetricsFactory metricsFactory;
- private final Counter indexSizeCounter;
- private Subscriber<? super IndexOperationMessage> subscriber;
-
- public Producer(MetricsFactory metricsFactory){
- this.metricsFactory = metricsFactory;
- this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size");
-
- }
- @Override
- public void call(Subscriber<? super IndexOperationMessage> subscriber) {
- this.subscriber = subscriber;
- }
-
- public BetterFuture put(IndexOperationMessage message){
- indexSizeCounter.inc(message.getBuilder().size());
- subscriber.onNext(message);
- return message.getFuture();
- }
-}