You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/02/26 00:17:12 UTC

[15/23] incubator-usergrid git commit: add batching

add batching


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6276726c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6276726c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6276726c

Branch: refs/heads/USERGRID-409
Commit: 6276726cb20c8dd581b49a453b244dd38bc6b68c
Parents: afd22eb
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 10:03:28 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 10:03:28 2015 -0700

----------------------------------------------------------------------
 .../index/impl/IndexBatchBufferImpl.java        |  2 +-
 .../persistence/index/impl/EntityIndexTest.java | 30 +++++++++++++-------
 2 files changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6276726c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index b5d9528..73af70f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -122,7 +122,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
         }
 
         final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh());
-        //clear the queue or proceed to buffersize
+        //clear the queue or proceed to buffer size
         Observable.from(containers)
                 .subscribeOn(Schedulers.io())
                 .flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6276726c/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index e820ce1..918ad35 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -24,6 +24,7 @@ import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
 import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
@@ -95,18 +96,23 @@ public class EntityIndexTest extends BaseIT {
 
         long now = System.currentTimeMillis();
         final int threads = 1000;
+        final int size = 50;
         final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
         final IndexScope indexScope = new IndexScopeImpl(appId, "things");
         final String entityType = "thing";
         entityIndex.initializeIndex();
         final CountDownLatch latch = new CountDownLatch(threads);
         final AtomicLong failTime=new AtomicLong(0);
+        InputStream is = this.getClass().getResourceAsStream(  "/sample-large.json" );
+        ObjectMapper mapper = new ObjectMapper();
+        final List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
         for(int i=0;i<threads;i++) {
             Thread thread = new Thread(new Runnable() {
                 public void run() {
                     try {
-                        insertJsonBlob(entityIndex, entityType, indexScope, "/sample-small.json", 1, 0);
-                        entityIndex.refresh();
+                        EntityIndexBatch batch = entityIndex.createBatch();
+                        insertJsonBlob(sampleJson,batch, entityType, indexScope, size, 0);
+                        batch.execute().get();
                     } catch (Exception e) {
                         synchronized (failTime) {
                             if (failTime.get() == 0) {
@@ -127,7 +133,7 @@ public class EntityIndexTest extends BaseIT {
         }catch (InterruptedException ie){
             throw new RuntimeException(ie);
         }
-        assertTrue("system must have failed at "+(failTime.get() - now) ,failTime.get()==0);
+        assertTrue("system must have failed at " + (failTime.get() - now), failTime.get() == 0);
     }
 
     @Test
@@ -210,17 +216,20 @@ public class EntityIndexTest extends BaseIT {
         testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0);
 
     }
-
     private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException {
         InputStream is = this.getClass().getResourceAsStream( filePath );
         ObjectMapper mapper = new ObjectMapper();
         List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
+        EntityIndexBatch batch = entityIndex.createBatch();
+        insertJsonBlob(sampleJson,batch, entityType, indexScope, max,startIndex);
+        batch.executeAndRefresh().get();
+    }
 
+    private void insertJsonBlob(List<Object> sampleJson, EntityIndexBatch batch, String entityType, IndexScope indexScope,final int max,final int startIndex) throws IOException {
         int count = 0;
         StopWatch timer = new StopWatch();
         timer.start();
 
-        final EntityIndexBatch batch = entityIndex.createBatch();
 
         if(startIndex > 0){
             for(int i =0; i<startIndex;i++){
@@ -246,10 +255,9 @@ public class EntityIndexTest extends BaseIT {
             }
         }
 
-        batch.executeAndRefresh().get();
         timer.stop();
-        log.info( "Total time to index {} entries {}ms, average {}ms/entry",
-                new Object[] { count, timer.getTime(), timer.getTime() / count } );
+        log.info("Total time to index {} entries {}ms, average {}ms/entry",
+                new Object[]{count, timer.getTime(), timer.getTime() / count } );
     }
 
 
@@ -428,7 +436,7 @@ public class EntityIndexTest extends BaseIT {
         user.setField( new StringField( "address2", "apt 508" ) );
         batch.index( indexScope,  user );
         user.setField( new StringField( "address3", "apt 508" ) );
-        batch.index( indexScope,  user );
+        batch.index( indexScope,  user);
         batch.executeAndRefresh().get();
 
         CandidateResults results = entityIndex.getEntityVersions(indexScope,  user.getId() );
@@ -467,7 +475,7 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch batch = ei.createBatch();
 
-        batch.index( appScope, user );
+        batch.index( appScope, user);
         batch.executeAndRefresh().get();
 
         Query query = new Query();
@@ -534,7 +542,7 @@ public class EntityIndexTest extends BaseIT {
         Entity fred = EntityIndexMapUtils.fromMap( fredMap );
         EntityUtils.setId( fred, new SimpleId( UUIDGenerator.newTimeUUID(), "user"  ) );
         EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
-        batch.index( appScope, fred );
+        batch.index( appScope, fred);
 
         batch.executeAndRefresh().get();