You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 01:22:07 UTC
[19/27] incubator-usergrid git commit: index message comments
index message comments
separate into classes
switch to producer consumer model
removing static class
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/489b2d78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/489b2d78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/489b2d78
Branch: refs/heads/USERGRID-280
Commit: 489b2d78da3167f63d447d5eb8b5048e744b51aa
Parents: 0621294
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 18:42:29 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 26 09:42:24 2015 -0700
----------------------------------------------------------------------
.../persistence/index/IndexBufferConsumer.java | 26 +++
.../persistence/index/IndexBufferProducer.java | 36 ++++
.../index/IndexOperationMessage.java | 5 +-
.../persistence/index/guice/IndexModule.java | 11 +-
.../index/impl/EsEntityIndexBatchImpl.java | 8 +-
.../index/impl/EsEntityIndexImpl.java | 10 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 193 +++++++++++++++++
.../index/impl/EsIndexBufferProducerImpl.java | 55 +++++
.../index/impl/IndexBatchBufferImpl.java | 206 -------------------
9 files changed, 325 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
new file mode 100644
index 0000000..ac7489c
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferConsumer {
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
new file mode 100644
index 0000000..6338a0c
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * Classy class class.
+ */
+public interface IndexBufferProducer extends Observable.OnSubscribe<IndexOperationMessage> {
+
+ @Override
+ void call(Subscriber<? super IndexOperationMessage> subscriber);
+
+ BetterFuture put(IndexOperationMessage message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 5acb17e..3a0a702 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
- * Classy class class.
+ * Container for index operations.
*/
public class IndexOperationMessage {
private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
@@ -47,9 +47,6 @@ public class IndexOperationMessage {
public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
return builders;
}
- public void done(){
- containerFuture.done();
- }
public BetterFuture<IndexOperationMessage> getFuture(){
return containerFuture;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 5af148a..d9a14c9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -19,14 +19,12 @@
package org.apache.usergrid.persistence.index.guice;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.*;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
-import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
+import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -42,7 +40,8 @@ public class IndexModule extends AbstractModule {
.implement(EntityIndex.class, EsEntityIndexImpl.class)
.build(EntityIndexFactory.class));
- bind(IndexBatchBuffer.class).to(IndexBatchBufferImpl.class);
+ bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
+ bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 455dba6..2e0fb56 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -76,18 +76,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
- private final IndexBatchBuffer indexBatchBuffer;
+ private final IndexBufferProducer indexBatchBufferProducer;
private final AliasedEntityIndex entityIndex;
private IndexOperationMessage container;
- public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
+ public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBufferProducer indexBatchBufferProducer,
final IndexFig config, final AliasedEntityIndex entityIndex ) {
this.applicationScope = applicationScope;
this.client = client;
- this.indexBatchBuffer = indexBatchBuffer;
+ this.indexBatchBufferProducer = indexBatchBufferProducer;
this.entityIndex = entityIndex;
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
@@ -204,7 +204,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public BetterFuture execute() {
IndexOperationMessage tempContainer = container;
container = new IndexOperationMessage();
- return indexBatchBuffer.put(tempContainer);
+ return indexBatchBufferProducer.put(tempContainer);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 693b168..ad638c6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -87,7 +87,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
- private final IndexBatchBuffer indexBatchBuffer;
+ private final IndexBufferProducer indexBatchBufferProducer;
/**
* We purposefully make this per instance. Some indexes may work, while others may fail
@@ -119,8 +119,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Inject
- public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBatchBuffer indexBatchBuffer, final EsProvider provider, final EsIndexCache indexCache) {
- this.indexBatchBuffer = indexBatchBuffer;
+ public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final EsIndexCache indexCache) {
+ this.indexBatchBufferProducer = indexBatchBufferProducer;
ValidationUtils.validateApplicationScope( appScope );
this.applicationScope = appScope;
this.esProvider = provider;
@@ -282,7 +282,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
EntityIndexBatch batch = new EsEntityIndexBatchImpl(
- applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
+ applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this );
return batch;
}
@@ -434,7 +434,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
public void refresh() {
- BetterFuture future = indexBatchBuffer.put(new IndexOperationMessage());
+ BetterFuture future = indexBatchBufferProducer.put(new IndexOperationMessage());
future.get();
//loop through all batches and retrieve promises and call get
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
new file mode 100644
index 0000000..eaca9bd
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -0,0 +1,193 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+@Singleton
+public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
+ private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class);
+
+ private final IndexFig config;
+ private final FailureMonitorImpl failureMonitor;
+ private final Client client;
+ private final Observable<List<IndexOperationMessage>> consumer;
+ private final Timer flushTimer;
+ private final Counter indexSizeCounter;
+
+ @Inject
+ public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){
+ this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
+ this.config = config;
+ this.failureMonitor = new FailureMonitorImpl(config,provider);
+ this.client = provider.getClient();
+
+ final AtomicLong queueSize = new AtomicLong();
+ //batch up sets of some size and send them in batch
+ this.consumer = Observable.create(producer)
+ .subscribeOn(Schedulers.io())
+ .doOnNext(new Action1<IndexOperationMessage>() {
+ @Override
+ public void call(IndexOperationMessage requestBuilderContainer) {
+ queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
+ }
+ })
+ .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
+ .doOnNext(new Action1<List<IndexOperationMessage>>() {
+ @Override
+ public void call(List<IndexOperationMessage> containerList) {
+ flushTimer.time();
+ if(containerList.size()>0){
+ execute(containerList);
+ }
+ }
+ });
+ consumer.subscribe();
+ }
+
+ /**
+ * Execute the request, check for errors, then re-init the batch for future use
+ */
+ private void execute(final List<IndexOperationMessage> operationMessages) {
+
+ if (operationMessages == null || operationMessages.size() == 0) {
+ return;
+ }
+
+ //process and flatten all the messages to builder requests
+ Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
+ .subscribeOn(Schedulers.io())
+ .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
+ @Override
+ public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
+ return Observable.from(operationMessage.getBuilder())
+ .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
+ @Override
+ public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
+ return builder;
+ }
+ });
+ }
+ });
+
+ //batch shard operations into a bulk request
+ flattenMessages
+ .buffer(config.getIndexBatchSize())
+ .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+ @Override
+ public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+ final BulkRequestBuilder bulkRequest = initRequest();
+ for (ShardReplicationOperationRequestBuilder builder : builders) {
+ indexSizeCounter.dec();
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder) builder);
+ }
+ }
+ sendRequest(bulkRequest);
+ }
+ })
+ .toBlocking().lastOrDefault(null);
+
+ //call back all futures
+ Observable.from(operationMessages)
+ .subscribeOn(Schedulers.io())
+ .doOnNext(new Action1<IndexOperationMessage>() {
+ @Override
+ public void call(IndexOperationMessage operationMessage) {
+ operationMessage.getFuture().done();
+ }
+ })
+ .toBlocking().lastOrDefault(null);
+ }
+
+ /**
+ * initialize request
+ * @return
+ */
+ private BulkRequestBuilder initRequest() {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+ bulkRequest.setRefresh(config.isForcedRefresh());
+ return bulkRequest;
+ }
+
+ /**
+ * send bulk request
+ * @param bulkRequest
+ */
+ private void sendRequest(BulkRequestBuilder bulkRequest) {
+ //nothing to do, we haven't added anthing to the index
+ if (bulkRequest.numberOfActions() == 0) {
+ return;
+ }
+
+ final BulkResponse responses;
+
+ try {
+ responses = bulkRequest.execute().actionGet();
+ } catch (Throwable t) {
+ log.error("Unable to communicate with elasticsearch");
+ failureMonitor.fail("Unable to execute batch", t);
+ throw t;
+ }
+
+ failureMonitor.success();
+
+ for (BulkItemResponse response : responses) {
+ if (response.isFailed()) {
+ throw new RuntimeException("Unable to index documents. Errors are :"
+ + response.getFailure().getMessage());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
new file mode 100644
index 0000000..791cea8
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.codahale.metrics.Counter;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferProducer;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import rx.Subscriber;
+
+/**
+ * Producer for index operation messages
+ */
+@Singleton
+public class EsIndexBufferProducerImpl implements IndexBufferProducer {
+
+ private final Counter indexSizeCounter;
+ private Subscriber<? super IndexOperationMessage> subscriber;
+
+ @Inject
+ public EsIndexBufferProducerImpl(MetricsFactory metricsFactory){
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class,"index.buffer.size");
+
+ }
+ @Override
+ public void call(Subscriber<? super IndexOperationMessage> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ public BetterFuture put(IndexOperationMessage message){
+ indexSizeCounter.inc(message.getBuilder().size());
+ subscriber.onNext(message);
+ return message.getFuture();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
deleted file mode 100644
index 92ab582..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBatchBuffer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * Buffer index requests into sets to send.
- */
-@Singleton
-public class IndexBatchBufferImpl implements IndexBatchBuffer {
-
- private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
- private final Counter indexSizeCounter;
- private final Client client;
- private final FailureMonitorImpl failureMonitor;
- private final IndexFig config;
- private final Timer flushTimer;
- private final Counter bufferCounter;
- private Observable<List<IndexOperationMessage>> consumer;
- private Producer producer;
-
- @Inject
- public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
- this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
- this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
- this.config = config;
- this.failureMonitor = new FailureMonitorImpl(config,provider);
- this.producer = new Producer();
- this.client = provider.getClient();
- bufferCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size");
- consumer();
- }
-
- private void consumer() {
- final AtomicLong queueSize = new AtomicLong();
- //batch up sets of some size and send them in batch
- this.consumer = Observable.create(producer)
- .subscribeOn(Schedulers.io())
- .doOnNext(new Action1<IndexOperationMessage>() {
- @Override
- public void call(IndexOperationMessage requestBuilderContainer) {
- queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
- }
- })
- .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
- .doOnNext(new Action1<List<IndexOperationMessage>>() {
- @Override
- public void call(List<IndexOperationMessage> containerList) {
- flushTimer.time();
- indexSizeCounter.dec(containerList.size());
- if(containerList.size()>0){
- execute(containerList);
- }
- }
- });
- consumer.subscribe();
- }
-
- @Override
- public BetterFuture put(IndexOperationMessage container){
- bufferCounter.inc();
- producer.put(container);
- return container.getFuture();
- }
-
-
- /**
- * Execute the request, check for errors, then re-init the batch for future use
- */
- private void execute(final List<IndexOperationMessage> containers) {
-
- if (containers == null || containers.size() == 0) {
- return;
- }
-
- final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh());
- //clear the queue or proceed to buffer size
- Observable.from(containers)
- .subscribeOn(Schedulers.io())
- .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
- @Override
- public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage requestBuilderContainer) {
- return Observable.from(requestBuilderContainer.getBuilder())
- .map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
- @Override
- public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) {
- return builder;
- }
- });
- }
- })
- .buffer(config.getIndexBatchSize())
- .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
- @Override
- public void call(List<ShardReplicationOperationRequestBuilder> builders) {
- final BulkRequestBuilder bulkRequest = initRequest(isForceRefresh.get());
- for (ShardReplicationOperationRequestBuilder builder : builders) {
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
- }
- }
- sendRequest(bulkRequest);
- }
- }).toBlocking().lastOrDefault(null);
-
- for (IndexOperationMessage container : containers) {
- container.done();
- }
- }
-
- private BulkRequestBuilder initRequest(boolean refresh) {
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
- bulkRequest.setRefresh(refresh);
- return bulkRequest;
- }
-
- private void sendRequest(BulkRequestBuilder bulkRequest) {
- //nothing to do, we haven't added anthing to the index
- if (bulkRequest.numberOfActions() == 0) {
- return;
- }
-
- final BulkResponse responses;
-
- try {
- responses = bulkRequest.execute().actionGet();
- } catch (Throwable t) {
- log.error("Unable to communicate with elasticsearch");
- failureMonitor.fail("Unable to execute batch", t);
- throw t;
- }
-
- failureMonitor.success();
-
- for (BulkItemResponse response : responses) {
- if (response.isFailed()) {
- throw new RuntimeException("Unable to index documents. Errors are :"
- + response.getFailure().getMessage());
- }
- }
- }
-
-
- private static class Producer implements Observable.OnSubscribe<IndexOperationMessage> {
-
- private Subscriber<? super IndexOperationMessage> subscriber;
-
- @Override
- public void call(Subscriber<? super IndexOperationMessage> subscriber) {
- this.subscriber = subscriber;
- }
-
- public void put(IndexOperationMessage r){
- subscriber.onNext(r);
- }
- }
-
-}