You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/17 23:28:26 UTC
incubator-usergrid git commit: adding index buffer
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-273-indexbuffer [created] db40139ee
adding index buffer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/db40139e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/db40139e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/db40139e
Branch: refs/heads/USERGRID-273-indexbuffer
Commit: db40139ee4019e8b6d2ff5071afe9ca4f17710d2
Parents: 32d52c3
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 17 14:27:59 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 17 14:27:59 2015 -0800
----------------------------------------------------------------------
.../persistence/index/IndexBatchBuffer.java | 49 ++++++
.../persistence/index/guice/IndexModule.java | 12 +-
.../index/impl/EsEntityIndexBatchImpl.java | 85 ++--------
.../index/impl/EsEntityIndexImpl.java | 6 +-
.../index/impl/IndexBatchBufferImpl.java | 167 +++++++++++++++++++
5 files changed, 243 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
new file mode 100644
index 0000000..4a384b4
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+
+/**
+ * Buffer for index operations,
+ */
+public interface IndexBatchBuffer {
+
+ /**
+ * flush out buffer and execute
+ */
+ public void flush();
+
+ /**
+ * flush out buffer and execute
+ */
+ public void flushAndRefresh();
+
+ /**
+ * put request into buffer
+ * @param builder
+ */
+ public void put(IndexRequestBuilder builder);
+
+ /**
+ * put request into buffer
+ * @param builder
+ */
+ public void put(DeleteRequestBuilder builder);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index edc938b..5af148a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -19,12 +19,14 @@
package org.apache.usergrid.persistence.index.guice;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
import org.apache.usergrid.persistence.index.IndexFig;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
+import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -34,11 +36,13 @@ public class IndexModule extends AbstractModule {
protected void configure() {
// install our configuration
- install (new GuicyFigModule( IndexFig.class ));
+ install(new GuicyFigModule(IndexFig.class));
- install( new FactoryModuleBuilder()
- .implement( EntityIndex.class, EsEntityIndexImpl.class )
- .build( EntityIndexFactory.class ) );
+ install(new FactoryModuleBuilder()
+ .implement(EntityIndex.class, EsEntityIndexImpl.class)
+ .build(EntityIndexFactory.class));
+
+ bind(IndexBatchBuffer.class).to(IndexBatchBufferImpl.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 44dc692..1bc21d3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.index.*;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,29 +89,23 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
- private BulkRequestBuilder bulkRequest;
-
- private final int autoFlushSize;
-
- private int count;
-
+ private final IndexBatchBuffer indexBatchBuffer;
private final FailureMonitor failureMonitor;
private final AliasedEntityIndex entityIndex;
- public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client,
- final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
+ public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
+ final IndexFig config, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
this.applicationScope = applicationScope;
this.client = client;
+ this.indexBatchBuffer = indexBatchBuffer;
this.failureMonitor = failureMonitor;
this.entityIndex = entityIndex;
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
- this.autoFlushSize = autoFlushSize;
- initBatch();
}
@@ -120,8 +116,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
IndexValidationUtils.validateIndexScope( indexScope );
ValidationUtils.verifyEntityWrite( entity );
- final String context = createContextName( indexScope );
- final String entityType = entity.getId().getType();
+ final String context = createContextName(indexScope);
if ( log.isDebugEnabled() ) {
log.debug( "Indexing entity {}:{}\n alias: {}\n" +
@@ -136,7 +131,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
// need prefix here because we index UUIDs as strings
-
// let caller add these fields if needed
// entityAsMap.put("created", entity.getId().getUuid().timestamp();
// entityAsMap.put("updated", entity.getVersion().timestamp());
@@ -144,12 +138,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
String indexId = createIndexDocId( entity, context );
log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
-
- bulkRequest.add( client.prepareIndex(
- alias.getWriteAlias(), entityType, indexId ).setSource( entityAsMap ) );
-
- maybeFlush();
-
+ final String entityType = entity.getId().getType();
+ IndexRequestBuilder builder =
+ client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
+ indexBatchBuffer.put(builder);
return this;
}
@@ -194,7 +186,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public Object call(String index) {
try {
- bulkRequest.add(client.prepareDelete(index, entityType, indexId).setRefresh(refresh));
+ DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
+ indexBatchBuffer.put(builder);
}catch (Exception e){
log.error("failed to deindex",e);
throw e;
@@ -205,7 +198,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug( "Deindexed Entity with index id " + indexId );
- maybeFlush();
return this;
}
@@ -228,58 +220,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public void execute() {
- execute( bulkRequest.setRefresh( refresh ) );
+ indexBatchBuffer.flush();
}
- /**
- * Execute the request, check for errors, then re-init the batch for future use
- */
- private void execute( final BulkRequestBuilder request ) {
-
- //nothing to do, we haven't added anthing to the index
- if ( request.numberOfActions() == 0 ) {
- return;
- }
-
- final BulkResponse responses;
-
- try {
- responses = request.execute().actionGet();
- }
- catch ( Throwable t ) {
- log.error( "Unable to communicate with elasticsearch" );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }
-
-
- failureMonitor.success();
-
- for ( BulkItemResponse response : responses ) {
- if ( response.isFailed() ) {
- throw new RuntimeException( "Unable to index documents. Errors are :"
- + response.getFailure().getMessage() );
- }
- }
-
- initBatch();
- }
-
@Override
public void executeAndRefresh() {
- execute( bulkRequest.setRefresh( true ) );
- }
-
-
- private void maybeFlush() {
- count++;
-
- if ( count % autoFlushSize == 0 ) {
- execute();
- count = 0;
- }
+ indexBatchBuffer.flushAndRefresh();
}
@@ -422,7 +370,4 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
}
- private void initBatch() {
- this.bulkRequest = client.prepareBulk();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 7eb4aa0..4e5687f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -86,6 +86,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
+ private final IndexBatchBuffer indexBatchBuffer;
/**
* We purposefully make this per instance. Some indexes may work, while others may fail
@@ -117,7 +118,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Inject
- public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final EsProvider provider, final EsIndexCache indexCache) {
+ public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBatchBuffer indexBatchBuffer, final EsProvider provider, final EsIndexCache indexCache) {
+ this.indexBatchBuffer = indexBatchBuffer;
ValidationUtils.validateApplicationScope( appScope );
this.applicationScope = appScope;
this.esProvider = provider;
@@ -279,7 +281,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
return new EsEntityIndexBatchImpl(
- applicationScope, esProvider.getClient(), config, 1000, failureMonitor, this );
+ applicationScope, esProvider.getClient(),indexBatchBuffer, config, failureMonitor, this );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
new file mode 100644
index 0000000..645c611
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+
+import java.util.List;
+
+
+/**
+ * Classy class class.
+ */
+@Singleton
+public class IndexBatchBufferImpl implements IndexBatchBuffer {
+
+ private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
+ private final Client client;
+ private final FailureMonitor failureMonitor;
+ private final IndexFig config;
+ private final boolean refresh;
+ private BulkRequestBuilder bulkRequest;
+ private Producer producer;
+
+ public IndexBatchBufferImpl(final Client client, FailureMonitor failureMonitor,final IndexFig config){
+ this.client = client;
+ this.failureMonitor = failureMonitor;
+ this.config = config;
+ this.producer = new Producer();
+ this.refresh = config.isForcedRefresh();
+ init();
+ }
+
+ private void clearBulk() {
+ bulkRequest = client.prepareBulk();
+ }
+
+ private void init() {
+ clearBulk();
+ Observable.create(producer)
+ .buffer(10)
+ .doOnNext(new Action1<List<RequestBuilderContainer>>() {
+ @Override
+ public void call(List<RequestBuilderContainer> builderContainerList) {
+ System.out.println("test test!!!");
+ for (RequestBuilderContainer container : builderContainerList) {
+ ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ continue;
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder)builder);
+ continue;
+ }
+ }
+ execute();
+ }
+ })
+ .subscribe();
+ }
+
+ public void put(IndexRequestBuilder builder){
+ producer.put(new RequestBuilderContainer(builder));
+ }
+
+ public void put(DeleteRequestBuilder builder){
+ producer.put(new RequestBuilderContainer(builder));
+ }
+
+ public void flushAndRefresh(){
+ execute(bulkRequest.setRefresh(true));
+ }
+ public void flush(){
+ execute();
+ }
+
+ private void execute(){
+ execute(bulkRequest.setRefresh(refresh));
+ }
+
+ /**
+ * Execute the request, check for errors, then re-init the batch for future use
+ */
+ private void execute( final BulkRequestBuilder request ) {
+ //nothing to do, we haven't added anthing to the index
+ if ( request.numberOfActions() == 0 ) {
+ return;
+ }
+
+ final BulkResponse responses;
+
+ try {
+ responses = request.execute().actionGet();
+ }
+ catch ( Throwable t ) {
+ log.error( "Unable to communicate with elasticsearch" );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+
+ failureMonitor.success();
+
+ for ( BulkItemResponse response : responses ) {
+ if ( response.isFailed() ) {
+ throw new RuntimeException( "Unable to index documents. Errors are :"
+ + response.getFailure().getMessage() );
+ }
+ }
+
+ clearBulk();
+ }
+
+ private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
+
+ private Subscriber<? super RequestBuilderContainer> subscriber;
+
+ @Override
+ public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ public void put(RequestBuilderContainer r){
+ subscriber.onNext(r);
+ }
+ }
+
+ private static class RequestBuilderContainer{
+ private final ShardReplicationOperationRequestBuilder builder;
+
+ public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
+ this.builder = builder;
+ }
+
+ public ShardReplicationOperationRequestBuilder getBuilder(){
+ return builder;
+ }
+ }
+
+}