You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@usergrid.apache.org by shawnfeldman <gi...@git.apache.org> on 2015/02/18 02:03:20 UTC

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

GitHub user shawnfeldman opened a pull request:

    https://github.com/apache/incubator-usergrid/pull/161

    Usergrid 273 indexbuffer

    buffer all indexing calls into sets of 100
    bring metrics factory to cp

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-usergrid USERGRID-273-indexbuffer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-usergrid/pull/161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #161
    
----
commit db40139ee4019e8b6d2ff5071afe9ca4f17710d2
Author: Shawn Feldman <sf...@apache.org>
Date:   2015-02-17T22:27:59Z

    adding index buffer

commit 7e0e0be85d2cb65990ca4f8f4dfcd09330f47a04
Author: Shawn Feldman <sf...@apache.org>
Date:   2015-02-17T23:35:25Z

    Adding metrics to cp

commit e478257513fc193d0c1dce97fab6368e080bdeeb
Author: Shawn Feldman <sf...@apache.org>
Date:   2015-02-18T01:01:34Z

    add blocking queue

commit bf63116a342830feb20157d818a4446d811682cc
Author: Shawn Feldman <sf...@apache.org>
Date:   2015-02-18T01:01:41Z

    Merge branch 'USERGRID-273' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-273-indexbuffer

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by shawnfeldman <gi...@git.apache.org>.
Github user shawnfeldman commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25301622
  
    --- Diff: stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java ---
    @@ -0,0 +1,226 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.usergrid.persistence.index.impl;
    +
    +import com.codahale.metrics.Counter;
    +import com.codahale.metrics.Timer;
    +import com.google.inject.Inject;
    +import com.google.inject.Singleton;
    +import org.apache.usergrid.persistence.core.future.BetterFuture;
    +import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
    +import org.apache.usergrid.persistence.index.IndexBatchBuffer;
    +import org.apache.usergrid.persistence.index.IndexFig;
    +import org.elasticsearch.action.WriteConsistencyLevel;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequestBuilder;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.delete.DeleteRequestBuilder;
    +import org.elasticsearch.action.index.IndexRequestBuilder;
    +import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
    +import org.elasticsearch.client.Client;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import rx.Observable;
    +import rx.Subscriber;
    +import rx.functions.Action0;
    +import rx.functions.Action1;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.concurrent.*;
    +
    +
    +/**
    + * Buffer index requests into sets to send.
    + */
    +@Singleton
    +public class IndexBatchBufferImpl implements IndexBatchBuffer {
    +
    +    private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
    +    private final MetricsFactory metricsFactory;
    +    private final Counter indexSizeCounter;
    +    private final Client client;
    +    private final FailureMonitorImpl failureMonitor;
    +    private final IndexFig config;
    +    private final Timer flushTimer;
    +    private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
    +    private Observable<List<RequestBuilderContainer>> consumer;
    +    private Producer producer;
    +
    +    @Inject
    +    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
    +        this.metricsFactory = metricsFactory;
    +        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
    +        this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
    +        this.config = config;
    +        this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
    +        this.failureMonitor = new FailureMonitorImpl(config,provider);
    +        this.producer = new Producer();
    +        this.client = provider.getClient();
    +
    +        consumer();
    +    }
    +
    +    private void consumer() {
    +        //batch up sets of some size and send them in batch
    +        this.consumer = Observable.create(producer)
    +                .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
    +                .doOnNext(new Action1<List<RequestBuilderContainer>>() {
    +                    @Override
    +                    public void call(List<RequestBuilderContainer> containerList) {
    +                        for (RequestBuilderContainer container : containerList) {
    +                            blockingQueue.add(container);
    +                        }
    +                        flushTimer.time();
    +                        indexSizeCounter.dec(containerList.size());
    +                        execute(config.isForcedRefresh());
    +                    }
    +                });
    +        consumer.subscribe();
    +    }
    +
    +    @Override
    +    public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
    +        RequestBuilderContainer container = new RequestBuilderContainer(builder);
    +        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
    --- End diff --
    
    so there is no hit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by tnine <gi...@git.apache.org>.
Github user tnine commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25295392
  
    --- Diff: stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---
    @@ -228,58 +217,29 @@ public EntityIndexBatch deindex( final IndexScope indexScope, final CandidateRes
     
         @Override
         public void execute() {
    -        execute( bulkRequest.setRefresh( refresh ) );
    +//        indexBatchBuffer.flush();
    +        Observable.from(promises)
    --- End diff --
    
    See previous comment.  I think you can just do
    
    ```
    Observable.from(promises).toBlocking().lastOrDefault(null)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by tnine <gi...@git.apache.org>.
Github user tnine commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25295292
  
    --- Diff: stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---
    @@ -87,41 +86,34 @@
         private final IndexIdentifier.IndexAlias alias;
         private final IndexIdentifier indexIdentifier;
     
    -    private BulkRequestBuilder bulkRequest;
    -
    -    private final int autoFlushSize;
    -
    -    private int count;
    -
    +    private final IndexBatchBuffer indexBatchBuffer;
         private final FailureMonitor failureMonitor;
     
         private final AliasedEntityIndex entityIndex;
    +    private final ConcurrentLinkedQueue<BetterFuture> promises;
     
     
    -    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client,
    -            final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
    +    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
    +            final IndexFig config, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
     
             this.applicationScope = applicationScope;
             this.client = client;
    +        this.indexBatchBuffer = indexBatchBuffer;
             this.failureMonitor = failureMonitor;
             this.entityIndex = entityIndex;
             this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
             this.alias = indexIdentifier.getAlias();
             this.refresh = config.isForcedRefresh();
    -        this.autoFlushSize = autoFlushSize;
    -        initBatch();
    +        this.promises = new ConcurrentLinkedQueue<>();
    --- End diff --
    
    Do we want to set an upper bound on this in case we can't flush fast enough?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by shawnfeldman <gi...@git.apache.org>.
Github user shawnfeldman closed the pull request at:

    https://github.com/apache/incubator-usergrid/pull/161


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by shawnfeldman <gi...@git.apache.org>.
Github user shawnfeldman commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25300900
  
    --- Diff: stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.usergrid.persistence.core.future;
    +
    +import rx.Observable;
    +import rx.Subscriber;
    +
    +/**
    + *
    + */
    +public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
    --- End diff --
    
    will remove, had it as a holdover when i was trying to do pure observables


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by tnine <gi...@git.apache.org>.
Github user tnine commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25295586
  
    --- Diff: stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java ---
    @@ -0,0 +1,226 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.usergrid.persistence.index.impl;
    +
    +import com.codahale.metrics.Counter;
    +import com.codahale.metrics.Timer;
    +import com.google.inject.Inject;
    +import com.google.inject.Singleton;
    +import org.apache.usergrid.persistence.core.future.BetterFuture;
    +import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
    +import org.apache.usergrid.persistence.index.IndexBatchBuffer;
    +import org.apache.usergrid.persistence.index.IndexFig;
    +import org.elasticsearch.action.WriteConsistencyLevel;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequestBuilder;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.delete.DeleteRequestBuilder;
    +import org.elasticsearch.action.index.IndexRequestBuilder;
    +import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
    +import org.elasticsearch.client.Client;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import rx.Observable;
    +import rx.Subscriber;
    +import rx.functions.Action0;
    +import rx.functions.Action1;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.concurrent.*;
    +
    +
    +/**
    + * Buffer index requests into sets to send.
    + */
    +@Singleton
    +public class IndexBatchBufferImpl implements IndexBatchBuffer {
    +
    +    private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
    +    private final MetricsFactory metricsFactory;
    +    private final Counter indexSizeCounter;
    +    private final Client client;
    +    private final FailureMonitorImpl failureMonitor;
    +    private final IndexFig config;
    +    private final Timer flushTimer;
    +    private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
    +    private Observable<List<RequestBuilderContainer>> consumer;
    +    private Producer producer;
    +
    +    @Inject
    +    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
    +        this.metricsFactory = metricsFactory;
    +        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
    +        this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
    +        this.config = config;
    +        this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
    +        this.failureMonitor = new FailureMonitorImpl(config,provider);
    +        this.producer = new Producer();
    +        this.client = provider.getClient();
    +
    +        consumer();
    +    }
    +
    +    private void consumer() {
    +        //batch up sets of some size and send them in batch
    +        this.consumer = Observable.create(producer)
    +                .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
    +                .doOnNext(new Action1<List<RequestBuilderContainer>>() {
    +                    @Override
    +                    public void call(List<RequestBuilderContainer> containerList) {
    +                        for (RequestBuilderContainer container : containerList) {
    +                            blockingQueue.add(container);
    +                        }
    +                        flushTimer.time();
    +                        indexSizeCounter.dec(containerList.size());
    +                        execute(config.isForcedRefresh());
    +                    }
    +                });
    +        consumer.subscribe();
    +    }
    +
    +    @Override
    +    public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
    +        RequestBuilderContainer container = new RequestBuilderContainer(builder);
    +        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
    --- End diff --
    
    Can we declare these in the constructor so we don't take the performance hit of the get on every put? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by tnine <gi...@git.apache.org>.
Github user tnine commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25294946
  
    --- Diff: stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.usergrid.persistence.core.future;
    +
    +import rx.Observable;
    +import rx.Subscriber;
    +
    +/**
    + *
    + */
    +public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
    --- End diff --
    
    Do we ned this class?  Can't we just use Observable.from(Future<T>)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by shawnfeldman <gi...@git.apache.org>.
Github user shawnfeldman commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25302433
  
    --- Diff: stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---
    @@ -228,58 +217,29 @@ public EntityIndexBatch deindex( final IndexScope indexScope, final CandidateRes
     
         @Override
         public void execute() {
    -        execute( bulkRequest.setRefresh( refresh ) );
    +//        indexBatchBuffer.flush();
    +        Observable.from(promises)
    --- End diff --
    
    i need to resolve all the promises though, not sure just resolving the observable will work without explicitly calling get


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Usergrid 273 indexbuffer

Posted by shawnfeldman <gi...@git.apache.org>.
Github user shawnfeldman commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/161#discussion_r25301597
  
    --- Diff: stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java ---
    @@ -0,0 +1,226 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.usergrid.persistence.index.impl;
    +
    +import com.codahale.metrics.Counter;
    +import com.codahale.metrics.Timer;
    +import com.google.inject.Inject;
    +import com.google.inject.Singleton;
    +import org.apache.usergrid.persistence.core.future.BetterFuture;
    +import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
    +import org.apache.usergrid.persistence.index.IndexBatchBuffer;
    +import org.apache.usergrid.persistence.index.IndexFig;
    +import org.elasticsearch.action.WriteConsistencyLevel;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequestBuilder;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.delete.DeleteRequestBuilder;
    +import org.elasticsearch.action.index.IndexRequestBuilder;
    +import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
    +import org.elasticsearch.client.Client;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import rx.Observable;
    +import rx.Subscriber;
    +import rx.functions.Action0;
    +import rx.functions.Action1;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.concurrent.*;
    +
    +
    +/**
    + * Buffer index requests into sets to send.
    + */
    +@Singleton
    +public class IndexBatchBufferImpl implements IndexBatchBuffer {
    +
    +    private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
    +    private final MetricsFactory metricsFactory;
    +    private final Counter indexSizeCounter;
    +    private final Client client;
    +    private final FailureMonitorImpl failureMonitor;
    +    private final IndexFig config;
    +    private final Timer flushTimer;
    +    private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
    +    private Observable<List<RequestBuilderContainer>> consumer;
    +    private Producer producer;
    +
    +    @Inject
    +    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
    +        this.metricsFactory = metricsFactory;
    +        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
    +        this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
    +        this.config = config;
    +        this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
    +        this.failureMonitor = new FailureMonitorImpl(config,provider);
    +        this.producer = new Producer();
    +        this.client = provider.getClient();
    +
    +        consumer();
    +    }
    +
    +    private void consumer() {
    +        //batch up sets of some size and send them in batch
    +        this.consumer = Observable.create(producer)
    +                .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
    +                .doOnNext(new Action1<List<RequestBuilderContainer>>() {
    +                    @Override
    +                    public void call(List<RequestBuilderContainer> containerList) {
    +                        for (RequestBuilderContainer container : containerList) {
    +                            blockingQueue.add(container);
    +                        }
    +                        flushTimer.time();
    +                        indexSizeCounter.dec(containerList.size());
    +                        execute(config.isForcedRefresh());
    +                    }
    +                });
    +        consumer.subscribe();
    +    }
    +
    +    @Override
    +    public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
    +        RequestBuilderContainer container = new RequestBuilderContainer(builder);
    +        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
    --- End diff --
    
    we can, i did this because i knew all the keys were cached in a singleton hashmap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---