You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/17 23:28:26 UTC

incubator-usergrid git commit: adding index buffer

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-273-indexbuffer [created] db40139ee


adding index buffer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/db40139e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/db40139e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/db40139e

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: db40139ee4019e8b6d2ff5071afe9ca4f17710d2
Parents: 32d52c3
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 17 14:27:59 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 17 14:27:59 2015 -0800

----------------------------------------------------------------------
 .../persistence/index/IndexBatchBuffer.java     |  49 ++++++
 .../persistence/index/guice/IndexModule.java    |  12 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |  85 ++--------
 .../index/impl/EsEntityIndexImpl.java           |   6 +-
 .../index/impl/IndexBatchBufferImpl.java        | 167 +++++++++++++++++++
 5 files changed, 243 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
new file mode 100644
index 0000000..4a384b4
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+
+/**
+ * Buffer for index operations,
+ */
+public interface IndexBatchBuffer {
+
+    /**
+     * flush out buffer and execute
+     */
+    public void flush();
+
+    /**
+     * flush out buffer and execute
+     */
+    public void flushAndRefresh();
+
+    /**
+     * put request into buffer
+     * @param builder
+     */
+    public void put(IndexRequestBuilder builder);
+
+    /**
+     * put request into buffer
+     * @param builder
+     */
+    public void put(DeleteRequestBuilder builder);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index edc938b..5af148a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -19,12 +19,14 @@
 
 package org.apache.usergrid.persistence.index.guice;
 
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
 import org.apache.usergrid.persistence.index.IndexFig;
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
+import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 
@@ -34,11 +36,13 @@ public class IndexModule extends AbstractModule {
     protected void configure() {
 
         // install our configuration
-        install (new GuicyFigModule( IndexFig.class ));
+        install(new GuicyFigModule(IndexFig.class));
 
-        install( new FactoryModuleBuilder()
-            .implement( EntityIndex.class, EsEntityIndexImpl.class )
-            .build( EntityIndexFactory.class ) );
+        install(new FactoryModuleBuilder()
+                .implement(EntityIndex.class, EsEntityIndexImpl.class)
+                .build(EntityIndexFactory.class));
+
+        bind(IndexBatchBuffer.class).to(IndexBatchBufferImpl.class);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 44dc692..1bc21d3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.index.*;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,29 +89,23 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
 
-    private BulkRequestBuilder bulkRequest;
-
-    private final int autoFlushSize;
-
-    private int count;
-
+    private final IndexBatchBuffer indexBatchBuffer;
     private final FailureMonitor failureMonitor;
 
     private final AliasedEntityIndex entityIndex;
 
 
-    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client,
-            final IndexFig config, final int autoFlushSize, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
+    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
+            final IndexFig config, final FailureMonitor failureMonitor, final AliasedEntityIndex entityIndex ) {
 
         this.applicationScope = applicationScope;
         this.client = client;
+        this.indexBatchBuffer = indexBatchBuffer;
         this.failureMonitor = failureMonitor;
         this.entityIndex = entityIndex;
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
-        this.autoFlushSize = autoFlushSize;
-        initBatch();
     }
 
 
@@ -120,8 +116,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         IndexValidationUtils.validateIndexScope( indexScope );
         ValidationUtils.verifyEntityWrite( entity );
 
-        final String context = createContextName( indexScope );
-        final String entityType = entity.getId().getType();
+        final String context = createContextName(indexScope);
 
         if ( log.isDebugEnabled() ) {
             log.debug( "Indexing entity {}:{}\n   alias: {}\n" +
@@ -136,7 +131,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         // need prefix here because we index UUIDs as strings
 
-
         // let caller add these fields if needed
         // entityAsMap.put("created", entity.getId().getUuid().timestamp();
         // entityAsMap.put("updated", entity.getVersion().timestamp());
@@ -144,12 +138,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         String indexId = createIndexDocId( entity, context );
 
         log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
-
-        bulkRequest.add( client.prepareIndex(
-                alias.getWriteAlias(), entityType, indexId ).setSource( entityAsMap ) );
-
-        maybeFlush();
-
+        final String entityType = entity.getId().getType();
+        IndexRequestBuilder builder =
+                client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
+        indexBatchBuffer.put(builder);
         return this;
     }
 
@@ -194,7 +186,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
                    @Override
                    public Object call(String index) {
                        try {
-                           bulkRequest.add(client.prepareDelete(index, entityType, indexId).setRefresh(refresh));
+                           DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
+                           indexBatchBuffer.put(builder);
                        }catch (Exception e){
                            log.error("failed to deindex",e);
                            throw e;
@@ -205,7 +198,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug( "Deindexed Entity with index id " + indexId );
 
-        maybeFlush();
 
         return this;
     }
@@ -228,58 +220,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public void execute() {
-        execute( bulkRequest.setRefresh( refresh ) );
+        indexBatchBuffer.flush();
     }
 
 
-    /**
-     * Execute the request, check for errors, then re-init the batch for future use
-     */
-    private void execute( final BulkRequestBuilder request ) {
-
-        //nothing to do, we haven't added anthing to the index
-        if ( request.numberOfActions() == 0 ) {
-            return;
-        }
-
-        final BulkResponse responses;
-
-        try {
-            responses = request.execute().actionGet();
-        }
-        catch ( Throwable t ) {
-            log.error( "Unable to communicate with elasticsearch" );
-            failureMonitor.fail( "Unable to execute batch", t );
-            throw t;
-        }
-
-
-        failureMonitor.success();
-
-        for ( BulkItemResponse response : responses ) {
-            if ( response.isFailed() ) {
-                throw new RuntimeException( "Unable to index documents.  Errors are :"
-                        + response.getFailure().getMessage() );
-            }
-        }
-
-        initBatch();
-    }
-
 
     @Override
     public void executeAndRefresh() {
-        execute( bulkRequest.setRefresh( true ) );
-    }
-
-
-    private void maybeFlush() {
-        count++;
-
-        if ( count % autoFlushSize == 0 ) {
-            execute();
-            count = 0;
-        }
+        indexBatchBuffer.flushAndRefresh();
     }
 
 
@@ -422,7 +370,4 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     }
 
 
-    private void initBatch() {
-        this.bulkRequest = client.prepareBulk();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 7eb4aa0..4e5687f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -86,6 +86,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
+    private final IndexBatchBuffer indexBatchBuffer;
 
     /**
      * We purposefully make this per instance. Some indexes may work, while others may fail
@@ -117,7 +118,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
 
     @Inject
-    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final EsProvider provider, final EsIndexCache indexCache) {
+    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBatchBuffer indexBatchBuffer, final EsProvider provider, final EsIndexCache indexCache) {
+        this.indexBatchBuffer = indexBatchBuffer;
         ValidationUtils.validateApplicationScope( appScope );
         this.applicationScope = appScope;
         this.esProvider = provider;
@@ -279,7 +281,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     @Override
     public EntityIndexBatch createBatch() {
         return new EsEntityIndexBatchImpl(
-                applicationScope, esProvider.getClient(), config, 1000, failureMonitor, this );
+                applicationScope, esProvider.getClient(),indexBatchBuffer, config, failureMonitor, this );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/db40139e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
new file mode 100644
index 0000000..645c611
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+
+import java.util.List;
+
+
+/**
+ * Classy class class.
+ */
+@Singleton
+public class IndexBatchBufferImpl implements IndexBatchBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
+    private final Client client;
+    private final FailureMonitor failureMonitor;
+    private final IndexFig config;
+    private final boolean refresh;
+    private BulkRequestBuilder bulkRequest;
+    private Producer producer;
+
+    public IndexBatchBufferImpl(final Client client, FailureMonitor failureMonitor,final IndexFig config){
+        this.client = client;
+        this.failureMonitor = failureMonitor;
+        this.config = config;
+        this.producer = new Producer();
+        this.refresh = config.isForcedRefresh();
+        init();
+    }
+
+    private void clearBulk() {
+        bulkRequest = client.prepareBulk();
+    }
+
+    private void init() {
+        clearBulk();
+        Observable.create(producer)
+                .buffer(10)
+                .doOnNext(new Action1<List<RequestBuilderContainer>>() {
+                    @Override
+                    public void call(List<RequestBuilderContainer> builderContainerList) {
+                        System.out.println("test test!!!");
+                        for (RequestBuilderContainer container : builderContainerList) {
+                            ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+                            if (builder instanceof IndexRequestBuilder) {
+                                bulkRequest.add((IndexRequestBuilder) builder);
+                                continue;
+                            }
+                            if (builder instanceof DeleteRequestBuilder) {
+                                bulkRequest.add((DeleteRequestBuilder)builder);
+                                continue;
+                            }
+                        }
+                        execute();
+                    }
+                })
+                .subscribe();
+    }
+
+    public void put(IndexRequestBuilder builder){
+        producer.put(new RequestBuilderContainer(builder));
+    }
+
+    public void put(DeleteRequestBuilder builder){
+        producer.put(new RequestBuilderContainer(builder));
+    }
+
+    public void flushAndRefresh(){
+        execute(bulkRequest.setRefresh(true));
+    }
+    public void flush(){
+        execute();
+    }
+
+    private void execute(){
+        execute(bulkRequest.setRefresh(refresh));
+    }
+
+    /**
+     * Execute the request, check for errors, then re-init the batch for future use
+     */
+    private void execute( final BulkRequestBuilder request ) {
+        //nothing to do, we haven't added anthing to the index
+        if ( request.numberOfActions() == 0 ) {
+            return;
+        }
+
+        final BulkResponse responses;
+
+        try {
+            responses = request.execute().actionGet();
+        }
+        catch ( Throwable t ) {
+            log.error( "Unable to communicate with elasticsearch" );
+            failureMonitor.fail( "Unable to execute batch", t );
+            throw t;
+        }
+
+        failureMonitor.success();
+
+        for ( BulkItemResponse response : responses ) {
+            if ( response.isFailed() ) {
+                throw new RuntimeException( "Unable to index documents.  Errors are :"
+                        + response.getFailure().getMessage() );
+            }
+        }
+
+        clearBulk();
+    }
+
+    private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
+
+        private Subscriber<? super RequestBuilderContainer> subscriber;
+
+        @Override
+        public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
+            this.subscriber = subscriber;
+        }
+
+        public void put(RequestBuilderContainer r){
+            subscriber.onNext(r);
+        }
+    }
+
+    private static class RequestBuilderContainer{
+        private final ShardReplicationOperationRequestBuilder builder;
+
+        public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
+            this.builder = builder;
+        }
+
+        public ShardReplicationOperationRequestBuilder getBuilder(){
+            return builder;
+        }
+    }
+
+}