You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/21 14:36:50 UTC
[1/2] incubator-usergrid git commit: moving towards futures
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-273-indexbuffer 8659771b1 -> 88435c852
moving towards futures
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2cd7cfe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2cd7cfe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2cd7cfe4
Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 2cd7cfe46e826752883a53008bda013ab6396937
Parents: 8659771
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 19 12:25:29 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 19 12:25:29 2015 -0800
----------------------------------------------------------------------
.../persistence/index/IndexBatchBuffer.java | 25 ++++++++++++++++++--
.../index/impl/EsEntityIndexBatchImpl.java | 25 ++++++++++++++++----
.../index/impl/IndexBatchBufferImpl.java | 23 ++++++++++--------
3 files changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2cd7cfe4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 6314bf2..5a3289b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -16,11 +16,14 @@
*/
package org.apache.usergrid.persistence.index;
+import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import rx.Observable;
+import java.util.concurrent.Callable;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
/**
* Buffer for index operations,
@@ -42,13 +45,31 @@ public interface IndexBatchBuffer {
*
* @param builder
*/
- public Observable put(IndexRequestBuilder builder);
+ public IndexBatchBuffer.BetterFuture put(IndexRequestBuilder builder);
/**
* put request into buffer
*
* @param builder
*/
- public Observable put(DeleteRequestBuilder builder);
+ public IndexBatchBuffer.BetterFuture put(DeleteRequestBuilder builder);
+ public static class BetterFuture<T>{
+ FutureTask<T> future;
+ public BetterFuture(Callable<T> callable){
+ future = new FutureTask<>(callable);
+ }
+
+ public void done(){
+ future.run();
+ }
+
+ public T get(){
+ try {
+ return future.get();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2cd7cfe4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 1bc21d3..f994262 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.util.concurrent.Futures;
import org.apache.usergrid.persistence.index.*;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -93,6 +94,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final FailureMonitor failureMonitor;
private final AliasedEntityIndex entityIndex;
+ private final ArrayList<IndexBatchBuffer.BetterFuture> promises;
public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -106,6 +108,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
+ this.promises = new ArrayList<IndexBatchBuffer.BetterFuture>();
}
@@ -141,7 +144,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
final String entityType = entity.getId().getType();
IndexRequestBuilder builder =
client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
- indexBatchBuffer.put(builder);
+ promises.add(indexBatchBuffer.put(builder));
return this;
}
@@ -187,7 +190,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public Object call(String index) {
try {
DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
- indexBatchBuffer.put(builder);
+ promises.add(indexBatchBuffer.put(builder));
}catch (Exception e){
log.error("failed to deindex",e);
throw e;
@@ -220,14 +223,28 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public void execute() {
- indexBatchBuffer.flush();
+// indexBatchBuffer.flush();
+ Observable.from(promises)
+ .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
+ @Override
+ public void call(IndexBatchBuffer.BetterFuture betterFuture) {
+ betterFuture.get();
+ }
+ }).toBlocking().lastOrDefault(null);
}
@Override
public void executeAndRefresh() {
- indexBatchBuffer.flushAndRefresh();
+// indexBatchBuffer.flushAndRefresh();
+ Observable.from(promises)
+ .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
+ @Override
+ public void call(IndexBatchBuffer.BetterFuture betterFuture) {
+ betterFuture.get();
+ }
+ }).toBlocking().lastOrDefault(null);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2cd7cfe4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index aa4ea17..37b3369 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -35,9 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
-import rx.Subscription;
import rx.functions.Action1;
-import sun.jvm.hotspot.opto.Block;
import java.util.ArrayList;
import java.util.Collection;
@@ -64,14 +62,14 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
- public Observable put(IndexRequestBuilder builder){
+ public BetterFuture put(IndexRequestBuilder builder){
RequestBuilderContainer container = new RequestBuilderContainer(builder);
metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
producer.put(container);
return container.getFuture();
}
- public Observable put(DeleteRequestBuilder builder){
+ public BetterFuture put(DeleteRequestBuilder builder){
RequestBuilderContainer container = new RequestBuilderContainer(builder);
metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
producer.put(new RequestBuilderContainer(builder));
@@ -214,26 +212,31 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
private static class RequestBuilderContainer{
private final ShardReplicationOperationRequestBuilder builder;
- private final Observable<RequestBuilderContainer> containerFuture;
+ private final BetterFuture<RequestBuilderContainer> containerFuture;
public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
final RequestBuilderContainer parent = this;
this.builder = builder;
this.containerFuture
- = Observable.create(new Observable.OnSubscribe<RequestBuilderContainer>(){
-
- })
+ = new BetterFuture<>(new Callable<RequestBuilderContainer>() {
+ @Override
+ public RequestBuilderContainer call() throws Exception {
+ return parent;
+ }
+ });
}
public ShardReplicationOperationRequestBuilder getBuilder(){
return builder;
}
public void done(){
- containerFuture.();
+ containerFuture.done();
}
- public Observable<RequestBuilderContainer> getFuture(){
+ public BetterFuture<RequestBuilderContainer> getFuture(){
return containerFuture;
}
}
+
+
}
[2/2] incubator-usergrid git commit: Adding Batch Buffer
Posted by sf...@apache.org.
Adding Batch Buffer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/88435c85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/88435c85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/88435c85
Branch: refs/heads/USERGRID-273-indexbuffer
Commit: 88435c8525810dc46e204d6d2254d8e135291f4b
Parents: 2cd7cfe
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 19 17:51:06 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Feb 19 17:51:06 2015 -0800
----------------------------------------------------------------------
.../persistence/core/future/BetterFuture.java | 42 +++
.../core/future/ObservableFuture.java | 42 +++
.../persistence/index/IndexBatchBuffer.java | 36 +--
.../index/impl/EsEntityIndexBatchImpl.java | 35 ++-
.../index/impl/IndexBatchBufferImpl.java | 260 +++++++++----------
5 files changed, 226 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
new file mode 100644
index 0000000..6146fe8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/**
+ * Future without the exception nastiness
+ */
+public class BetterFuture<T>{
+ FutureTask<T> future;
+ public BetterFuture(Callable<T> callable){
+ future = new FutureTask<>(callable);
+ }
+
+ public void done(){
+ future.run();
+ }
+
+ public T get(){
+ try {
+ return future.get();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
new file mode 100644
index 0000000..d08dd92
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ *
+ */
+public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
+
+ private Subscriber<? super T> subscriber;
+
+ @Override
+ public void call(Subscriber<? super T> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ public void done(T t){
+ this.subscriber.onCompleted();
+ }
+
+ public void emit(T t){
+ this.subscriber.onNext(t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 5a3289b..b24d37b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -16,6 +16,7 @@
*/
package org.apache.usergrid.persistence.index;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -26,50 +27,23 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
/**
- * Buffer for index operations,
+ * * Buffer index requests into sets to send,
*/
public interface IndexBatchBuffer {
/**
- * flush out buffer and execute
- */
- public void flush();
-
- /**
- * flush out buffer and execute
- */
- public void flushAndRefresh();
-
- /**
- * put request into buffer
+ * put request into buffer, retu
*
* @param builder
*/
- public IndexBatchBuffer.BetterFuture put(IndexRequestBuilder builder);
+ public BetterFuture put(IndexRequestBuilder builder);
/**
* put request into buffer
*
* @param builder
*/
- public IndexBatchBuffer.BetterFuture put(DeleteRequestBuilder builder);
-
- public static class BetterFuture<T>{
- FutureTask<T> future;
- public BetterFuture(Callable<T> callable){
- future = new FutureTask<>(callable);
- }
+ public BetterFuture put(DeleteRequestBuilder builder);
- public void done(){
- future.run();
- }
- public T get(){
- try {
- return future.get();
- }catch (Exception e){
- throw new RuntimeException(e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index f994262..896c038 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -18,16 +18,12 @@
*/
package org.apache.usergrid.persistence.index.impl;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.Futures;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.*;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -94,7 +90,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final FailureMonitor failureMonitor;
private final AliasedEntityIndex entityIndex;
- private final ArrayList<IndexBatchBuffer.BetterFuture> promises;
+ private final ConcurrentLinkedQueue<BetterFuture> promises;
public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -108,14 +104,12 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
- this.promises = new ArrayList<IndexBatchBuffer.BetterFuture>();
+ this.promises = new ConcurrentLinkedQueue<>();
}
@Override
public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
-
-
IndexValidationUtils.validateIndexScope( indexScope );
ValidationUtils.verifyEntityWrite( entity );
@@ -225,12 +219,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public void execute() {
// indexBatchBuffer.flush();
Observable.from(promises)
- .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
+ .doOnNext(new Action1<BetterFuture>() {
@Override
- public void call(IndexBatchBuffer.BetterFuture betterFuture) {
+ public void call(BetterFuture betterFuture) {
betterFuture.get();
}
}).toBlocking().lastOrDefault(null);
+ promises.clear();
}
@@ -238,13 +233,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public void executeAndRefresh() {
// indexBatchBuffer.flushAndRefresh();
- Observable.from(promises)
- .doOnNext(new Action1<IndexBatchBuffer.BetterFuture>() {
- @Override
- public void call(IndexBatchBuffer.BetterFuture betterFuture) {
- betterFuture.get();
- }
- }).toBlocking().lastOrDefault(null);
+ Iterator<BetterFuture> iterator = promises.iterator();
+ while(iterator.hasNext()){
+ iterator.next().get();
+ }
+ promises.clear();
+ entityIndex.refresh();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/88435c85/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 37b3369..d7e6bf1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -20,6 +20,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBatchBuffer;
import org.apache.usergrid.persistence.index.IndexFig;
@@ -35,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
+import rx.functions.Action0;
import rx.functions.Action1;
import java.util.ArrayList;
@@ -44,199 +46,181 @@ import java.util.concurrent.*;
/**
- * Classy class class.
+ * Buffer index requests into sets to send.
*/
@Singleton
public class IndexBatchBufferImpl implements IndexBatchBuffer {
private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
private final MetricsFactory metricsFactory;
+ private final Counter indexSizeCounter;
+ private final Client client;
+ private final FailureMonitorImpl failureMonitor;
+ private final IndexFig config;
+ private final Timer flushTimer;
+ private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
+ private Observable<List<RequestBuilderContainer>> consumer;
private Producer producer;
- private Consumer consumer;
@Inject
- public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
+ public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
this.metricsFactory = metricsFactory;
+ this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
+ this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
+ this.config = config;
+ this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
+ this.failureMonitor = new FailureMonitorImpl(config,provider);
this.producer = new Producer();
- this.consumer = new Consumer(config,producer,metricsFactory, provider);
+ this.client = provider.getClient();
+
+ consumer();
}
+ private void consumer() {
+ //batch up sets of some size and send them in batch
+ this.consumer = Observable.create(producer)
+ .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
+ .doOnNext(new Action1<List<RequestBuilderContainer>>() {
+ @Override
+ public void call(List<RequestBuilderContainer> containerList) {
+ for (RequestBuilderContainer container : containerList) {
+ blockingQueue.add(container);
+ }
+ flushTimer.time();
+ indexSizeCounter.dec(containerList.size());
+ execute(config.isForcedRefresh());
+ }
+ });
+ consumer.subscribe();
+ }
- public BetterFuture put(IndexRequestBuilder builder){
+ @Override
+ public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
RequestBuilderContainer container = new RequestBuilderContainer(builder);
metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
producer.put(container);
return container.getFuture();
}
- public BetterFuture put(DeleteRequestBuilder builder){
+ @Override
+ public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
RequestBuilderContainer container = new RequestBuilderContainer(builder);
metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
- producer.put(new RequestBuilderContainer(builder));
+ producer.put(container);
return container.getFuture();
}
- public void flushAndRefresh(){
- try {
- Thread.sleep(500);
- }catch (Exception e){
-
- }
- }
- public void flush(){
- try {
- Thread.sleep(500);
- }catch (Exception e){
-
- }
- }
- private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
- private Subscriber<? super RequestBuilderContainer> subscriber;
+ private static class RequestBuilderContainer{
+ private final ShardReplicationOperationRequestBuilder builder;
+ private final BetterFuture<ShardReplicationOperationRequestBuilder> containerFuture;
- @Override
- public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
- this.subscriber = subscriber;
+ public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
+ final RequestBuilderContainer parent = this;
+ this.builder = builder;
+ this.containerFuture = new BetterFuture<>(new Callable<ShardReplicationOperationRequestBuilder>() {
+ @Override
+ public ShardReplicationOperationRequestBuilder call() throws Exception {
+ return parent.getBuilder();
+ }
+ });
}
- public void put(RequestBuilderContainer r){
- subscriber.onNext(r);
+ public ShardReplicationOperationRequestBuilder getBuilder(){
+ return builder;
+ }
+ private void done(){
+ containerFuture.done();
+ }
+ public BetterFuture<ShardReplicationOperationRequestBuilder> getFuture(){
+ return containerFuture;
}
}
+ /**
+ * Execute the request, check for errors, then re-init the batch for future use
+ */
+ private void execute( boolean refresh) {
- public static class Consumer {
- private final Observable<List<RequestBuilderContainer>> consumer;
- private final Timer flushTimer;
- private final Counter indexSizeCounter;
- private final BlockingQueue<RequestBuilderContainer> blockingQueue;
- private final Client client;
- private final IndexFig config;
- private final FailureMonitorImpl failureMonitor;
-
- public Consumer(final IndexFig config, Producer producer,MetricsFactory metricsFactory, final EsProvider provider){
- this.config = config;
- this.client = provider.getClient();
- this.failureMonitor = new FailureMonitorImpl(config,provider);
- this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
- this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
- this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
- this.consumer = Observable.create(producer)
- .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
- .doOnNext(new Action1<List<RequestBuilderContainer>>() {
- @Override
- public void call(List<RequestBuilderContainer> containerList) {
- for (RequestBuilderContainer container : containerList) {
- blockingQueue.add(container);
- }
- flushTimer.time();
- indexSizeCounter.dec(containerList.size());
- execute(config.isForcedRefresh());
- }
- });
- consumer.subscribe();
+ if (blockingQueue.size() == 0) {
+ return;
}
- /**
- * Execute the request, check for errors, then re-init the batch for future use
- */
- private void execute(boolean refresh) {
-
- if (blockingQueue.size() == 0) {
- return;
- }
-
- BulkRequestBuilder bulkRequest = initRequest(refresh);
+ BulkRequestBuilder bulkRequest = initRequest(refresh);
- Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
- blockingQueue.drainTo(containerCollection);
- int count = 0;
- //clear the queue or proceed to buffersize
- for (RequestBuilderContainer container : containerCollection) {
+ Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
+ blockingQueue.drainTo(containerCollection);
+ int count = 0;
+ //clear the queue or proceed to buffersize
+ for (RequestBuilderContainer container : containerCollection) {
- ShardReplicationOperationRequestBuilder builder = container.getBuilder();
- //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
- }
+ ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+ //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder) builder);
+ }
- if (count++ == config.getIndexBatchSize()) {
- sendRequest(bulkRequest);
- bulkRequest = initRequest(refresh);
+ if (count++ == config.getIndexBatchSize()) {
+ sendRequest(bulkRequest);
+ bulkRequest = initRequest(refresh);
- }
- }
- sendRequest(bulkRequest);
- for (RequestBuilderContainer container : containerCollection) {
- container.done();
}
}
-
- private BulkRequestBuilder initRequest(boolean refresh) {
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
- bulkRequest.setRefresh(refresh);
- return bulkRequest;
+ sendRequest(bulkRequest);
+ for (RequestBuilderContainer container : containerCollection) {
+ container.done();
}
+ }
- private void sendRequest(BulkRequestBuilder bulkRequest) {
- //nothing to do, we haven't added anthing to the index
- if (bulkRequest.numberOfActions() == 0) {
- return;
- }
+ private BulkRequestBuilder initRequest(boolean refresh) {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+ bulkRequest.setRefresh(refresh);
+ return bulkRequest;
+ }
- final BulkResponse responses;
+ private void sendRequest(BulkRequestBuilder bulkRequest) {
+ //nothing to do, we haven't added anthing to the index
+ if (bulkRequest.numberOfActions() == 0) {
+ return;
+ }
- try {
- responses = bulkRequest.execute().actionGet();
- } catch (Throwable t) {
- log.error("Unable to communicate with elasticsearch");
- failureMonitor.fail("Unable to execute batch", t);
- throw t;
- }
+ final BulkResponse responses;
- failureMonitor.success();
+ try {
+ responses = bulkRequest.execute().actionGet();
+ } catch (Throwable t) {
+ log.error("Unable to communicate with elasticsearch");
+ failureMonitor.fail("Unable to execute batch", t);
+ throw t;
+ }
- for (BulkItemResponse response : responses) {
- if (response.isFailed()) {
- throw new RuntimeException("Unable to index documents. Errors are :"
- + response.getFailure().getMessage());
- }
+ failureMonitor.success();
+
+ for (BulkItemResponse response : responses) {
+ if (response.isFailed()) {
+ throw new RuntimeException("Unable to index documents. Errors are :"
+ + response.getFailure().getMessage());
}
}
-
}
- private static class RequestBuilderContainer{
- private final ShardReplicationOperationRequestBuilder builder;
- private final BetterFuture<RequestBuilderContainer> containerFuture;
- public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
- final RequestBuilderContainer parent = this;
- this.builder = builder;
- this.containerFuture
- = new BetterFuture<>(new Callable<RequestBuilderContainer>() {
- @Override
- public RequestBuilderContainer call() throws Exception {
- return parent;
- }
- });
- }
+ private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
- public ShardReplicationOperationRequestBuilder getBuilder(){
- return builder;
- }
- public void done(){
- containerFuture.done();
+ private Subscriber<? super RequestBuilderContainer> subscriber;
+
+ @Override
+ public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
+ this.subscriber = subscriber;
}
- public BetterFuture<RequestBuilderContainer> getFuture(){
- return containerFuture;
+
+ public void put(RequestBuilderContainer r){
+ subscriber.onNext(r);
}
}
-
-
}